lrhkobe opened a new issue #457:
URL: https://github.com/apache/incubator-eventmesh/issues/457


   **The scenes of message downstream action:**
   1. normal downstream
   ```
   public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
        ....
         //msg put in eventmesh,waiting client ack
        session.getPusher().unAckMsg(downStreamMsgContext.seq, 
downStreamMsgContext);
   
        session.downstreamMsg(downStreamMsgContext);
       ...
   }
   ```
   2. when downstream fail, push retry queue, execute downstream 
   ```
   private void retryHandle(DownStreamMsgContext downStreamMsgContext) {
       ...
       downStreamMsgContext.session = rechoosen;
       rechoosen.downstreamMsg(downStreamMsgContext);
       ...
   }
   ```
   3. when closeSession, the session has downstreamed msg which is unAcked, and 
there is other session which is belonged to same consumer group, the unAcked 
msg will repush to other client with same group
   ```
   private void handleUnackMsgsInSession(Session session) {
          ...
         downStreamMsgContext.session = reChooseSession;
         reChooseSession.downstreamMsg(downStreamMsgContext);
         ...
    }
   ```
   
   The scene2 and scene3 have ack problem, **because the msg does not put in 
unAckMap of the rechoosed session**. When the rechoosed session repley ack, 
this leads EventMesh can not find the msg to ack .
   
   
   **Message downstream process in EventMesh:**
   
   1. select a session(client) 
   2. put the msg in a map of selected session
   3. execute downstream action
   
   **Message ack process in EventMesh:**
   
   1. session(client) reply ack msg to EventMesh
   2. EventMesh execute ack action in `MessageAckTask.java`
   ```
           DownStreamMsgContext downStreamMsgContext = 
session.getPusher().getUnAckMsg().get(seq);
           if (downStreamMsgContext != null) {
               downStreamMsgContext.ackMsg();
               session.getPusher().getUnAckMsg().remove(seq);
           }else {
              logger.warn("MessageAckTask, seq:{}, downStreamMsgContext not in 
downStreamMap,client:{}", seq, session.getClient());
           }
   ```
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to