This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new af8a6cdf9 [ISSUE #3348]Modify SessionContext's public attributes to 
private
     new bc2f4a13e Merge pull request #3349 from mxsm/eventmesh-3348
af8a6cdf9 is described below

commit af8a6cdf9d9a8b573466a98e0d798ae3ddf6cd39
Author: mxsm <[email protected]>
AuthorDate: Mon Mar 6 00:01:48 2023 +0800

    [ISSUE #3348]Modify SessionContext's public attributes to private
---
 .../core/protocol/tcp/client/group/ClientSessionGroupMapping.java | 2 +-
 .../runtime/core/protocol/tcp/client/session/Session.java         | 8 ++++----
 .../runtime/core/protocol/tcp/client/session/SessionContext.java  | 8 ++++++--
 .../runtime/core/protocol/tcp/client/task/UnSubscribeTask.java    | 4 ++--
 .../apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java | 6 +++---
 5 files changed, 16 insertions(+), 12 deletions(-)

diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 5670bdd3c..9f219a321 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -277,7 +277,7 @@ public class ClientSessionGroupMapping {
      * @param session
      */
     private void cleanSubscriptionInSession(Session session) throws Exception {
-        for (SubscriptionItem item : 
session.getSessionContext().subscribeTopics.values()) {
+        for (SubscriptionItem item : 
session.getSessionContext().getSubscribeTopics().values()) {
             ClientGroupWrapper clientGroupWrapper = 
Objects.requireNonNull(session.getClientGroupWrapper().get());
             clientGroupWrapper.removeSubscription(item, session);
             if (!clientGroupWrapper.hasSubscription(item.getTopic())) {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 52fcd5180..d5a8f4d0a 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -169,7 +169,7 @@ public class Session {
 
     public void subscribe(List<SubscriptionItem> items) throws Exception {
         for (SubscriptionItem item : items) {
-            sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item);
+            sessionContext.getSubscribeTopics().putIfAbsent(item.getTopic(), 
item);
             Objects.requireNonNull(clientGroupWrapper.get()).subscribe(item);
 
             
Objects.requireNonNull(clientGroupWrapper.get()).getMqProducerWrapper().getMeshMQProducer()
@@ -182,7 +182,7 @@ public class Session {
 
     public void unsubscribe(List<SubscriptionItem> items) throws Exception {
         for (SubscriptionItem item : items) {
-            sessionContext.subscribeTopics.remove(item.getTopic());
+            sessionContext.getSubscribeTopics().remove(item.getTopic());
             
Objects.requireNonNull(clientGroupWrapper.get()).removeSubscription(item, this);
 
             if 
(!Objects.requireNonNull(clientGroupWrapper.get()).hasSubscription(item.getTopic()))
 {
@@ -195,7 +195,7 @@ public class Session {
     public EventMeshTcpSendResult upstreamMsg(Header header, CloudEvent event, 
SendCallback sendCallback,
         long startTime, long taskExecuteTime) {
         String topic = event.getSubject();
-        sessionContext.sendTopics.putIfAbsent(topic, topic);
+        sessionContext.getSendTopics().putIfAbsent(topic, topic);
         return sender.send(header, event, sendCallback, startTime, 
taskExecuteTime);
     }
 
@@ -358,7 +358,7 @@ public class Session {
             return false;
         }
 
-        if (!sessionContext.subscribeTopics.containsKey(topic)) {
+        if (!sessionContext.getSubscribeTopics().containsKey(topic)) {
             log.warn("session is not available because session has not 
subscribe topic:{},client:{}", topic, client);
             return false;
         }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
index 2751df10b..8182bfc59 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
@@ -24,13 +24,17 @@ import org.apache.commons.lang3.time.DateFormatUtils;
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import lombok.Getter;
+
 public class SessionContext {
 
     private Session session;
 
-    public ConcurrentHashMap<String, String> sendTopics = new 
ConcurrentHashMap<String, String>();
+    @Getter
+    private final ConcurrentHashMap<String, String> sendTopics = new 
ConcurrentHashMap<>(64);
 
-    public ConcurrentHashMap<String, SubscriptionItem> subscribeTopics = new 
ConcurrentHashMap<String, SubscriptionItem>();
+    @Getter
+    private final ConcurrentHashMap<String/*Topic*/, SubscriptionItem> 
subscribeTopics = new ConcurrentHashMap<>(64);
 
     public long createTime = System.currentTimeMillis();
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
index 77dc5cf1b..99bc12f59 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
@@ -55,8 +55,8 @@ public class UnSubscribeTask extends AbstractTask {
         try {
             synchronized (session) {
                 List<SubscriptionItem> topics = new 
ArrayList<SubscriptionItem>();
-                if 
(MapUtils.isNotEmpty(session.getSessionContext().subscribeTopics)) {
-                    for (Map.Entry<String, SubscriptionItem> entry : 
session.getSessionContext().subscribeTopics.entrySet()) {
+                if 
(MapUtils.isNotEmpty(session.getSessionContext().getSubscribeTopics())) {
+                    for (Map.Entry<String, SubscriptionItem> entry : 
session.getSessionContext().getSubscribeTopics().entrySet()) {
                         topics.add(entry.getValue());
                     }
                     session.unsubscribe(topics);
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
index 596c4361a..2c4804440 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
@@ -112,14 +112,14 @@ public class EventMeshTcpMonitor {
                 AtomicLong deliveredMsgsCount = 
session.getPusher().getDeliveredMsgsCount();
                 AtomicLong deliveredFailCount = 
session.getPusher().getDeliverFailMsgsCount();
                 int unAckMsgsCount = session.getPusher().getTotalUnackMsgs();
-                int sendTopics = session.getSessionContext().sendTopics.size();
-                int subscribeTopics = 
session.getSessionContext().subscribeTopics.size();
+                int sendTopics = 
session.getSessionContext().getSendTopics().size();
+                int subscribeTopics = 
session.getSessionContext().getSubscribeTopics().size();
 
                 
tcpLogger.info("session|deliveredFailCount={}|deliveredMsgsCount={}|unAckMsgsCount={}|sendTopics={}|subscribeTopics={}|user={}",
                     deliveredFailCount.longValue(), 
deliveredMsgsCount.longValue(),
                     unAckMsgsCount, sendTopics, subscribeTopics, 
session.getClient());
 
-                
topicSet.addAll(session.getSessionContext().subscribeTopics.keySet());
+                
topicSet.addAll(session.getSessionContext().getSubscribeTopics().keySet());
             }
             tcpSummaryMetrics.setSubTopicNum(topicSet.size());
             
tcpSummaryMetrics.setAllConnections(EventMeshTcpConnectionHandler.connections.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to