Repository: qpid-jms-amqp-0-x
Updated Branches:
  refs/heads/6.3.x 4e127c211 -> 298fdbec8


QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Stop handling incoming frames on 
session after sending channel.close

Cherry-picked from f89f6c2f45d11fc63551d0d61c17eceedd6bd247


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/ca62a0c5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/ca62a0c5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/ca62a0c5

Branch: refs/heads/6.3.x
Commit: ca62a0c52b54c29681e8669a3fe8202300002d83
Parents: 4e127c2
Author: Alex Rudyy <[email protected]>
Authored: Thu May 10 17:02:05 2018 +0100
Committer: Keith Wall <[email protected]>
Committed: Mon May 14 10:11:24 2018 +0100

----------------------------------------------------------------------
 .../client/protocol/AMQProtocolSession.java     | 36 +++++++++++++++++++-
 1 file changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ca62a0c5/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java 
b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 51eb4cf..9b5c6a8 100644
--- 
a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ 
b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -42,6 +42,8 @@ import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
@@ -229,6 +231,12 @@ public class AMQProtocolSession implements 
AMQVersionAwareProtocolSession
     @Override
     public void contentHeaderReceived(int channelId, ContentHeaderBody 
contentHeader) throws QpidException
     {
+        if (isClosedForInput(channelId))
+        {
+            _logger.debug("Ignoring content header as channel {} closed", 
channelId);
+            return;
+        }
+
         final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) 
((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? 
_channelId2UnprocessedMsgArray[channelId]
                                                : 
_channelId2UnprocessedMsgMap.get(channelId));
 
@@ -252,6 +260,11 @@ public class AMQProtocolSession implements 
AMQVersionAwareProtocolSession
     @Override
     public void contentBodyReceived(final int channelId, ContentBody 
contentBody) throws QpidException
     {
+        if (isClosedForInput(channelId))
+        {
+            _logger.debug("Ignoring content body as channel {} closed", 
channelId);
+            return;
+        }
         UnprocessedMessage_0_8 msg;
         final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
         if (fastAccess)
@@ -469,7 +482,16 @@ public class AMQProtocolSession implements 
AMQVersionAwareProtocolSession
     @Override
     public void methodFrameReceived(final int channel, final AMQMethodBody 
amqMethodBody) throws QpidException
     {
-        _protocolHandler.methodBodyReceived(channel, amqMethodBody);
+        if ( channel == 0
+             || !isClosedForInput(channel)
+             || (isClosing(channel) && (amqMethodBody instanceof 
ChannelCloseBody || amqMethodBody instanceof ChannelCloseOkBody)))
+        {
+            _protocolHandler.methodBodyReceived(channel, amqMethodBody);
+        }
+        else
+        {
+            _logger.debug("Ignoring method {} as channel {} closed on {}", 
amqMethodBody, channel);
+        }
     }
 
     public void notifyError(Exception error)
@@ -514,4 +536,16 @@ public class AMQProtocolSession implements 
AMQVersionAwareProtocolSession
     {
         return _methodProcessor;
     }
+
+    private boolean isClosing(final int channelId)
+    {
+        return _closingChannels.containsKey(channelId);
+    }
+
+    private boolean isClosedForInput(final int channelId)
+    {
+        AMQSession session;
+        return channelId > 0 && ((session = _connection.getSession(channelId)) 
== null || session.isClosed());
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to