Repository: qpid-jms-amqp-0-x Updated Branches: refs/heads/master e4e2b6b20 -> 983827591
QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Stop handling incoming frames on session after sending channel.close Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/f89f6c2f Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/f89f6c2f Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/f89f6c2f Branch: refs/heads/master Commit: f89f6c2f45d11fc63551d0d61c17eceedd6bd247 Parents: e4e2b6b Author: Alex Rudyy <[email protected]> Authored: Thu May 10 17:02:05 2018 +0100 Committer: Alex Rudyy <[email protected]> Committed: Thu May 10 17:10:57 2018 +0100 ---------------------------------------------------------------------- .../client/protocol/AMQProtocolSession.java | 36 +++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/f89f6c2f/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 51eb4cf..9b5c6a8 100644 --- a/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -42,6 +42,8 @@ import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; @@ -229,6 +231,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession @Override public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws QpidException { + if (isClosedForInput(channelId)) + { + _logger.debug("Ignoring content header as channel {} closed", channelId); + return; + } + final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] : _channelId2UnprocessedMsgMap.get(channelId)); @@ -252,6 +260,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession @Override public void contentBodyReceived(final int channelId, ContentBody contentBody) throws QpidException { + if (isClosedForInput(channelId)) + { + _logger.debug("Ignoring content body as channel {} closed", channelId); + return; + } UnprocessedMessage_0_8 msg; final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0; if (fastAccess) @@ -469,7 +482,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession @Override public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws QpidException { - _protocolHandler.methodBodyReceived(channel, amqMethodBody); + if ( channel == 0 + || !isClosedForInput(channel) + || (isClosing(channel) && (amqMethodBody instanceof ChannelCloseBody || amqMethodBody instanceof ChannelCloseOkBody))) + { + _protocolHandler.methodBodyReceived(channel, amqMethodBody); + } + else + { + _logger.debug("Ignoring method {} as channel {} closed on {}", amqMethodBody, channel); + } } public void notifyError(Exception error) @@ -514,4 +536,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { return _methodProcessor; } + + private boolean isClosing(final int channelId) + { + return _closingChannels.containsKey(channelId); + } + + private boolean isClosedForInput(final int channelId) + { + AMQSession session; + return channelId > 0 && ((session = _connection.getSession(channelId)) == null || session.isClosed()); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
