QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Make sure that client closes TCP connection on failure with sending connection.close
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/98382759 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/98382759 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/98382759 Branch: refs/heads/master Commit: 983827591c27fd7d5d7289bbd9373a71728ba191 Parents: f89f6c2 Author: Alex Rudyy <[email protected]> Authored: Tue May 8 16:34:27 2018 +0100 Committer: Alex Rudyy <[email protected]> Committed: Thu May 10 17:11:06 2018 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/client/AMQConnection.java | 4 ++-- .../apache/qpid/client/AMQProtocolHandler.java | 20 +++++++++++++++----- .../org/apache/qpid/client/AMQSession_0_8.java | 2 +- .../qpid/client/BasicMessageProducer_0_8.java | 12 +++++++----- .../protocol/BlockingMethodFrameListener.java | 10 +++++++++- .../apache/qpid/client/state/StateWaiter.java | 6 ++++++ .../listener/SpecificMethodFrameListener.java | 4 ++-- .../apache/qpid/client/util/BlockingWaiter.java | 6 ++++-- .../client/protocol/AMQProtocolHandlerTest.java | 2 +- 9 files changed, 47 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/AMQConnection.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java index c629414..4426fb6 100644 --- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1203,7 +1203,7 @@ public class AMQConnection extends Closeable implements CommonConnection, Refere } catch (JMSException e) { - _logger.error("Error closing connection", e); + _logger.warn("Error closing connection", e); throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing connection: " + e), e); } finally @@ -1271,7 +1271,7 @@ public class AMQConnection extends Closeable implements CommonConnection, Refere } catch (JMSException e) { - _logger.error("Error closing session: " + e); + _logger.warn("Error closing session: " + e); sessionException = e; } } http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java index 5d59e50..6b50f90 100644 --- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java +++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java @@ -672,10 +672,18 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws QpidException, FailoverException { - return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass), + return writeCommandFrameAndWaitForReply(frame, + new SpecificMethodFrameListener(frame.getChannel(), + responseClass, + getConnectionDetails()), timeout); } + public String getConnectionDetails() + { + return getLocalAddress() + "-" + getRemoteAddress(); + } + public void closeSession(AMQSession session) throws QpidException { _protocolSession.closeSession(session); @@ -707,17 +715,19 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, final AMQFrame frame = body.generateFrame(0); syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _network.close(); - closed(); - } + } catch (AMQTimeoutException e) { - closed(); + _logger.debug("Timeout on sending connection close : " + e); } catch (FailoverException e) { _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway."); } + finally + { + _network.close(); + } } } http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 1acb3a1..6c7738a 100644 --- a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -929,7 +929,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public QueueDeclareOkHandler() { - super(getChannelId(), QueueDeclareOkBody.class); + super(getChannelId(), QueueDeclareOkBody.class, getProtocolHandler().getConnectionDetails()); } public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 41166e0..8412849 100644 --- a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -362,17 +362,19 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer && (connectionDelegate80.isConfirmedPublishSupported() || (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported())); + AMQProtocolHandler protocolHandler = getConnection().getProtocolHandler(); if(!useConfirms) { - getConnection().getProtocolHandler().writeFrame(compositeFrame); + protocolHandler.writeFrame(compositeFrame); } else { - final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId()); + final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId(), + protocolHandler.getConnectionDetails()); try { - getConnection().getProtocolHandler().writeCommandFrameAndWaitForReply(compositeFrame, + protocolHandler.writeCommandFrameAndWaitForReply(compositeFrame, frameListener); if(frameListener.isRejected()) @@ -468,9 +470,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer * * @param channelId The channel id to filter incoming methods with. */ - public PublishConfirmMessageListener(final int channelId) + public PublishConfirmMessageListener(final int channelId, String connectionDetails) { - super(channelId); + super(channelId, connectionDetails); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 6618b34..5c56ad7 100644 --- a/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -55,6 +55,7 @@ import org.apache.qpid.protocol.AMQMethodListener; public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener { + private final String _connectionDetails; /** Holds the channel id for the channel upon which this listener is waiting for a response. */ private int _channelId; @@ -62,10 +63,12 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth * Creates a new method listener, that filters incoming method to just those that match the specified channel id. * * @param channelId The channel id to filter incoming methods with. + * @param connectionDetails */ - public BlockingMethodFrameListener(int channelId) + public BlockingMethodFrameListener(int channelId, final String connectionDetails) { _channelId = channelId; + _connectionDetails = connectionDetails; } /** @@ -121,4 +124,9 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth } } + @Override + public String getConnectionDetails() + { + return _connectionDetails; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index 5f0a935..ce7c03a 100644 --- a/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -82,6 +82,12 @@ public class StateWaiter extends BlockingWaiter<AMQState> return _awaitStates.contains(state); } + @Override + public String getConnectionDetails() + { + return null; + } + /** * Await for the required State to be achieved within the default timeout. * @return The achieved state that was requested. http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index f0d7feb..9a3f733 100644 --- a/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -28,9 +28,9 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener { private final Class _expectedClass; - public SpecificMethodFrameListener(int channelId, Class expectedClass) + public SpecificMethodFrameListener(int channelId, Class expectedClass, final String connectionDetails) { - super(channelId); + super(channelId, connectionDetails); _expectedClass = expectedClass; } http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 66be535..23adf3c 100644 --- a/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -170,8 +170,8 @@ public abstract class BlockingWaiter<T> final String errorMsg = String.format( "The server's response was not received within the time-out period of %d ms. " + "Possible reasons include: the server may be too busy, the network may be " - + "overloaded, or this JVM itself may be too busy to process the response.", - timeout); + + "overloaded, or this JVM itself may be too busy to process the response. [%s]", + timeout, getConnectionDetails() == null ? "" : getConnectionDetails()); _error = new AMQTimeoutException(errorMsg, null); _ready = true; } @@ -338,4 +338,6 @@ public abstract class BlockingWaiter<T> return new QpidException("Waiter was closed.", null); } + public abstract String getConnectionDetails(); + } http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 74a1809..69d180e 100644 --- a/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -286,7 +286,7 @@ public class AMQProtocolHandlerTest extends QpidTestCase */ public BlockToAccessFrameListener(int channelId) { - super(channelId); + super(channelId, "Test"); _logger.info("Creating a listener:" + this); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
