Author: ritchiem
Date: Tue Oct 13 11:31:09 2009
New Revision: 824704
URL: http://svn.apache.org/viewvc?rev=824704&view=rev
Log:
QPID-1950 : Simplified the connection by using the default URL and configuring
retry rather than using the default failover URL that has multiple brokers
defined. Whilst this may not stop the test failing it will make the log files
simpler.
Updated FailoverHandler to ensure that any pending write is sync'd if possible.
Also updated Test to perform a synchronous operation after the ack to ensure it
arrives at the broker, QPID-2138 highlights that it doesn't get there due to
what appears to be Mina propagating the exception ahead of the data.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=824704&r1=824703&r2=824704&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
Tue Oct 13 11:31:09 2009
@@ -147,8 +147,16 @@
// So lets make a new one.
_amqProtocolHandler.setStateManager(new AMQStateManager());
- // Close the session, false says don't wait for it to close, just
close it.
-
_amqProtocolHandler.getProtocolSession().closeProtocolSession(false);
+ // Close the session, we need to wait for it to close as there may
have
+ // been data in transit such as an ack that is still valid to send.
+ //
+ // While we are allowing data to continue to be written to the
+ // socket assuming the connection is still valid, we do not
consider
+ // the possibility that the problem that triggered failover was
+ // entirely client side. In that situation the socket will still be
+ // open and the we should really send a ConnectionClose to be AMQP
+ // compliant.
+ _amqProtocolHandler.getProtocolSession().closeProtocolSession();
// Use a fresh new StateManager for the reconnection attempts
_amqProtocolHandler.setStateManager(new AMQStateManager());
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java?rev=824704&r1=824703&r2=824704&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
Tue Oct 13 11:31:09 2009
@@ -179,9 +179,16 @@
messages.remove(0).getIntProperty("count"),
received.getIntProperty("count"));
- // Allow ack to be sent to broker, by performing a synchronous command
- // along the session.
-// _session.createConsumer(_session.createTemporaryQueue()).close();
+ // When the Exception is received by the underlying IO layer it will
+ // initiate failover. The first step of which is to ensure that the
+ // existing conection is closed. So in this situation the connection
+ // will be flushed casuing the above ACK to be sent to the broker.
+ //
+ // That said:
+ // when the socket close is detected on the server it will rise up the
+ // Mina filter chain and interrupt processing.
+ // this has been raised as QPID-2138
+ _session.createConsumer(_session.createTemporaryQueue()).close();
//Retain IO Layer
AMQProtocolSession protocolSession =
_connection.getProtocolHandler().getProtocolSession();
@@ -261,8 +268,14 @@
private void initialiseConnection()
throws Exception
{
- //Create Connection
- _connection = (AMQConnection) getConnection();
+ //Create Connection using the default connection URL. i.e. not the
Failover URL that would be used by default
+ _connection = (AMQConnection)
getConnection(getConnectionFactory("default").getConnectionURL());
+ // The default connection does not have any retries configured so
+ // Allow this connection to retry so that we can block on the failover.
+ // The alternative would be to use the getConnection() default.
However,
+ // this would add additional complexity in the logging as a second
+ // broker is defined in that url. We do not need it for this test.
+ _connection.getFailoverPolicy().getCurrentMethod().setRetries(1);
_connection.setConnectionListener(this);
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]