Author: ritchiem
Date: Mon Oct  5 15:04:15 2009
New Revision: 821824

URL: http://svn.apache.org/viewvc?rev=821824&view=rev
Log:
QPID-1816 : Add Acknowledge tests and QuickAcking manual test helper.
Updated AcknowldegeAfterFailoverTest to correctly cover the failure cases. 
Sending messages on a dirty transaction and Receiveing messagges on a dirty 
session.

Added:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=821824&r1=821823&r2=821824&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Oct  5 15:04:15 2009
@@ -60,6 +60,7 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
@@ -777,8 +778,16 @@
 
         try
         {
+            //Check that we are clean to commit.
+            if (_failedOverDirty)
+            {
+                rollback();
+
+                throw new TransactionRolledBackException("Connection failover 
has occured since last send. " +
+                                                         "Forced rollback");
+            }
+
 
-            // TGM FIXME: what about failover?
             // Acknowledge all delivered messages
             while (true)
             {
@@ -1509,6 +1518,8 @@
 
             sendRecover();
 
+            markClean();
+            
             if (!isSuspended)
             {
                 suspendChannel(false);

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java?rev=821824&r1=821823&r2=821824&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
 Mon Oct  5 15:04:15 2009
@@ -20,20 +20,28 @@
  */
 package org.apache.qpid.test.unit.ack;
 
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
  */
-public class AcknowledgeAfterFailoverTest extends AcknowledgeTest
+public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements 
ConnectionListener
 {
 
+    protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+
     @Override
     public void setUp() throws Exception
     {
@@ -46,6 +54,13 @@
         NUM_MESSAGES = 10;
     }
 
+    @Override
+    protected void init(boolean transacted, int mode) throws Exception
+    {
+        super.init(transacted, mode);
+        ((AMQConnection) _connection).setConnectionListener(this);
+    }
+
     protected void prepBroker(int count) throws Exception
     {
         if (count % 2 == 1)
@@ -107,10 +122,17 @@
     }
 
     /**
-     * @param transacted
-     * @param mode
+     * Test that Acking/Committing a message received before failover causes
+     * an exception at commit/ack time.
+     *
+     * Expected behaviour is that in:
+     * * tx mode commit() throws a transacted RolledBackException
+     * * client ack mode throws an IllegalStateException
      *
-     * @throws Exception
+     * @param transacted is this session trasacted
+     * @param mode       What ack mode should be used if not trasacted
+     *
+     * @throws Exception if something goes wrong.
      */
     protected void testDirtyAcking(boolean transacted, int mode) throws 
Exception
     {
@@ -125,27 +147,55 @@
         int count = 0;
         assertNotNull("Message " + count + " not correctly received.", msg);
         assertEquals("Incorrect message received", count, 
msg.getIntProperty(INDEX));
-        count++;
-
-        //Don't acknowledge just prep the next broker
 
+        //Don't acknowledge just prep the next broker. Without changing count
+        // Prep the new broker to have all all the messages so we can validate
+        // that they can all be correctly received.
         try
         {
-            prepBroker(count);
+
+            //Stop the connection so we can validate the number of message 
count
+            // on the queue is correct after failover
+            _connection.stop();
+            failBroker(getFailingPort());
+
+            //Get the connection to the first (main port) broker.
+            Connection connection = 
getConnection();//getConnectionFactory("connection1").getConnectionURL());
+            // Use a transaction to send messages so we can be sure they 
arrive.
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            // ensure destination is created.
+            session.createConsumer(_queue).close();
+
+            sendMessage(session, _queue, NUM_MESSAGES);
+
+            assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+                         ((AMQSession) session).getQueueDepth((AMQDestination) 
_queue));
+
+            connection.close();
+
+            //restart connection
+            _connection.start();
         }
         catch (Exception e)
         {
             fail("Unable to prep new broker," + e.getMessage());
         }
 
-        // Consume the next message
+        // Consume the next message - don't check what it is as a normal would
+        // assume it is msg 1 but as we've fallen over it is msg 0 again.
         msg = _consumer.receive(1500);
-        assertNotNull("Message " + count + " not correctly received.", msg);
-        assertEquals("Incorrect message received", count, 
msg.getIntProperty(INDEX));
 
         if (_consumerSession.getTransacted())
         {
-            _consumerSession.commit();
+            try
+            {
+                _consumerSession.commit();
+                fail("Session is dirty we should get an 
TransactionRolledBackException");
+            }
+            catch (TransactionRolledBackException trbe)
+            {
+                //expected path
+            }
         }
         else
         {
@@ -154,12 +204,32 @@
                 msg.acknowledge();
                 fail("Session is dirty we should get an 
IllegalStateException");
             }
-            catch (IllegalStateException ise)
+            catch (javax.jms.IllegalStateException ise)
             {
                 assertEquals("Incorrect Exception thrown", "has failed over", 
ise.getMessage());
+                // Recover the sesion and try again.
+                _consumerSession.recover();
             }
         }
 
+        msg = _consumer.receive(1500);
+        // Validate we now get the first message back
+        assertEquals(0, msg.getIntProperty(INDEX));
+
+        msg = _consumer.receive(1500);
+        // and the second message
+        assertEquals(1, msg.getIntProperty(INDEX));
+
+        // And now verify that we can now commit the clean session
+        if (_consumerSession.getTransacted())
+        {
+            _consumerSession.commit();
+        }
+        else
+        {
+            msg.acknowledge();
+        }
+
         assertEquals("Wrong number of messages on queue", 0,
                      ((AMQSession) 
_consumerSession).getQueueDepth((AMQDestination) _queue));
     }
@@ -169,9 +239,129 @@
         testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
     }
 
-    public void testDirtyTransacted() throws Exception
+    public void testDirtyAckingTransacted() throws Exception
     {
         testDirtyAcking(true, Session.SESSION_TRANSACTED);
     }
 
+    /**
+     * If a transacted session has failed over whilst it has uncommitted sent
+     * data then we need to throw a TransactedRolledbackException on commit()
+     *
+     * The alternative would be to maintain a replay buffer so that the message
+     * could be resent. This is not currently implemented
+     *
+     * @throws Exception if something goes wrong.
+     */
+    public void testDirtySendingTransacted() throws Exception
+    {
+        Session producerSession = _connection.createSession(true, 
Session.SESSION_TRANSACTED);
+
+        // Ensure we get failover notifications
+        ((AMQConnection) _connection).setConnectionListener(this);        
+
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        // Create and send message 0
+        Message msg = producerSession.createMessage();
+        msg.setIntProperty(INDEX, 0);
+        producer.send(msg);
+
+        // DON'T commit message .. fail connection
+
+        failBroker(getFailingPort());
+
+        // Ensure destination exists for sending
+        producerSession.createConsumer(_queue).close();
+
+        // Send the next message
+        msg.setIntProperty(INDEX, 1);
+        try
+        {
+            producer.send(msg);
+            fail("Should fail with Qpid as we provide early warning of the 
dirty session via a JMSException.");
+        }
+        catch (JMSException jmse)
+        {
+            assertEquals("Early warning of dirty session not correct",
+                         "Failover has occurred and session is dirty so unable 
to send.", jmse.getMessage());
+        }
+
+        // Ignore that the session is dirty and attempt to commit to validate 
the
+        // exception is thrown. AND that the above failure notification did NOT
+        // clean up the session.
+
+        try
+        {
+            producerSession.commit();
+            fail("Session is dirty we should get an 
TransactionRolledBackException");
+        }
+        catch (TransactionRolledBackException trbe)
+        {
+            // Normal path.
+        }
+
+        // Resend messages
+        msg.setIntProperty(INDEX, 0);
+        producer.send(msg);
+        msg.setIntProperty(INDEX, 1);
+        producer.send(msg);
+
+        producerSession.commit();
+
+        assertEquals("Wrong number of messages on queue", 2,
+                     ((AMQSession) 
producerSession).getQueueDepth((AMQDestination) _queue));
+    }
+
+    // AMQConnectionListener Interface.. used so we can validate that we
+    // actually failed over.
+
+    public void bytesSent(long count)
+    {
+    }
+
+    public void bytesReceived(long count)
+    {
+    }
+
+    public boolean preFailover(boolean redirect)
+    {
+        //Allow failover
+        return true;
+    }
+
+    public boolean preResubscribe()
+    {
+        //Allow failover
+        return true;
+    }
+
+    public void failoverComplete()
+    {
+        _failoverCompleted.countDown();
+    }
+
+    /**
+     * Override so we can block until failover has completd
+     *
+     * @param port
+     */
+    @Override
+    public void failBroker(int port)
+    {
+        super.failBroker(port);
+
+        try
+        {
+            if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, 
TimeUnit.MILLISECONDS))
+            {
+                fail("Failover did not occur in specified time:" + 
DEFAULT_FAILOVER_TIME);
+            }
+        }
+        catch (InterruptedException e)
+        {
+            fail("Failover was interuppted");
+        }
+    }
+
 }

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java?rev=821824&r1=821823&r2=821824&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
 Mon Oct  5 15:04:15 2009
@@ -28,10 +28,12 @@
 import javax.jms.Session;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class AcknowledgeOnMessageTest extends AcknowledgeTest implements 
MessageListener
 {
     private CountDownLatch _receviedAll;
+    private AtomicReference<Exception> _causeOfFailure = new 
AtomicReference<Exception>(null);
 
     @Override
     public void setUp() throws Exception
@@ -49,14 +51,21 @@
     protected void testAcking(boolean transacted, int mode) throws Exception
     {
         init(transacted, mode);
-
         _consumer.setMessageListener(this);
 
         _connection.start();
 
-        if (!_receviedAll.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+        if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS))
         {
-            fail("failover did not complete");
+            fail("All messages not received.");
+        }
+
+        // Check to see if we ended due to an exception in the onMessage 
handler
+        Exception cause = _causeOfFailure.get();
+        if (cause != null)
+        {
+            cause.printStackTrace();
+            fail(cause.getMessage());
         }
 
         _consumer.close();
@@ -91,8 +100,17 @@
         }
     }
 
+    /**
+     * Pass the given exception back to the waiting thread to fail the test 
run.
+     * @param e The exception that is causing the test to fail.  
+     */
     protected void fail(Exception e)
     {
-
+       _causeOfFailure.set(e);
+        // End the test.
+        while (_receviedAll.getCount() != 0)
+        {
+            _receviedAll.countDown();
+        }
     }
 }

Added: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java?rev=821824&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
 (added)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
 Mon Oct  5 15:04:15 2009
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.jms.Session;
+
+import javax.jms.Message;
+import javax.jms.Queue;
+
+public class FailoverBeforeConsumingRecoverTest extends RecoverTest
+{
+
+    @Override
+    protected void initTest() throws Exception
+    {
+        super.initTest();
+        failBroker(getFailingPort());
+
+        Queue queue = _consumerSession.createQueue(getTestQueueName());
+        sendMessage(_connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);        
+    }
+}

Added: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java?rev=821824&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
 (added)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
 Mon Oct  5 15:04:15 2009
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * This is a quick manual test to validate acking after failover with a
+ * transacted session.
+ *
+ * Start an external broker then run this test. Std Err will print.
+ * Sent Message: 1
+ * Received Message: 1
+ *
+ * You can then restart the external broker, which will cause failover, which
+ * will be complete when the following appears.
+ *
+ * Failover Complete
+ *
+ * A second message send/receive cycle is then done to validate that the
+ * connection/session are still working.
+ *
+ */
+public class QuickAcking extends QpidTestCase implements ConnectionListener
+{
+    protected AMQConnection _connection;
+    protected Queue _queue;
+    protected Session _session;
+    protected MessageConsumer _consumer;
+    private CountDownLatch _failedOver;
+    private static final String INDEX = "INDEX";
+    private int _count = 0;
+
+    public void setUp()
+    {
+        // Prevent broker startup. Broker must be run manually.
+    }
+
+    public void test() throws Exception
+    {
+        _failedOver = new CountDownLatch(1);
+
+        _connection = new 
AMQConnection("amqp://guest:gu...@client/test?brokerlist='localhost?retries='20'&connectdelay='2000''");
+
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        _queue = _session.createQueue("QAtest");
+        _consumer = _session.createConsumer(_queue);
+        _connection.setConnectionListener(this);
+        _connection.start();
+
+        sendAndReceive();
+
+        _failedOver.await();
+
+        sendAndReceive();
+
+    }
+
+    private void sendAndReceive()
+            throws Exception
+    {
+        sendMessage();
+
+        Message message = _consumer.receive();
+
+        if (message.getIntProperty(INDEX) != _count)
+        {
+            throw new Exception("Incorrect message recieved:" + _count);
+        }
+
+        if (_session.getTransacted())
+        {
+            _session.commit();
+        }
+        System.err.println("Recevied Message:" + _count);
+    }
+
+    private void sendMessage() throws JMSException
+    {
+        MessageProducer producer = _session.createProducer(_queue);
+        Message message = _session.createMessage();
+        _count++;
+        message.setIntProperty(INDEX, _count);
+
+        producer.send(message);
+        if (_session.getTransacted())
+        {
+            _session.commit();
+        }
+        producer.close();
+
+        System.err.println("Sent Message:" + _count);
+    }
+
+    public void bytesSent(long count)
+    {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    public void bytesReceived(long count)
+    {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    public boolean preFailover(boolean redirect)
+    {
+        return true;
+    }
+
+    public boolean preResubscribe()
+    {
+        return true;
+    }
+
+    public void failoverComplete()
+    {
+        System.err.println("Failover Complete");
+        _failedOver.countDown();
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to