Author: robbie
Date: Mon Feb 13 00:30:34 2012
New Revision: 1243379

URL: http://svn.apache.org/viewvc?rev=1243379&view=rev
Log:
QPID-3829: use a seperate object for reference checking to stop the AMQMessage 
holding its underlying 0-8/0-9/0-9-1 connection/io objects in memory after they 
are closed. Also stops an NPE on the 0-8/0-9/0-9-1 subscriptions when 
evaluating no-local after store recovery.

Enables NoLocalAfterRecoveryTest again, though updated to make it simpler and 
more reliable. This test should be removed if changes for QPID-3605 are 
undertaken.

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
    qpid/trunk/qpid/java/test-profiles/JavaExcludes
    qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1243379&r1=1243378&r2=1243379&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Mon Feb 13 00:30:34 2012
@@ -1108,7 +1108,7 @@ public class AMQChannel implements Sessi
         AMQMessage message = new 
AMQMessage(incomingMessage.getStoredMessage());
 
         message.setExpiration(incomingMessage.getExpiration());
-        message.setClientIdentifier(_session);
+        message.setConnectionIdentifier(_session.getReference());
         return message;
     }
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=1243379&r1=1243378&r2=1243379&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
 Mon Feb 13 00:30:34 2012
@@ -58,7 +58,7 @@ public class AMQMessage extends Abstract
 
     private final long _size;
 
-    private Object _sessionIdentifier;
+    private Object _connectionIdentifier;
     private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | 
DELIVERED_TO_CONSUMER);
 
     public AMQMessage(StoredMessage<MessageMetaData> handle)
@@ -218,19 +218,15 @@ public class AMQMessage extends Abstract
     }
 
 
-    public Object getPublisherIdentifier()
+    public Object getConnectionIdentifier()
     {
-        //todo store sessionIdentifier/client id with message in store
-        //Currently the _sessionIdentifier will be null if the message has been
-        // restored from a message Store
-
-        return _sessionIdentifier;
+        return _connectionIdentifier;
 
     }
 
-    public void setClientIdentifier(final Object sessionIdentifier)
+    public void setConnectionIdentifier(final Object connectionIdentifier)
     {
-        _sessionIdentifier = sessionIdentifier;
+        _connectionIdentifier = connectionIdentifier;
     }
 
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1243379&r1=1243378&r2=1243379&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
 Mon Feb 13 00:30:34 2012
@@ -132,7 +132,8 @@ public class AMQProtocolEngine implement
     private Subject _authorizedSubject;
     private MethodDispatcher _dispatcher;
 
-    private final long _sessionID;
+    private final long _connectionID;
+    private Object _reference = new Object();
 
     private AMQPConnectionActor _actor;
     private LogSubject _logSubject;
@@ -170,7 +171,7 @@ public class AMQProtocolEngine implement
         _codecFactory = new AMQCodecFactory(true, this);
 
         setNetworkConnection(network);
-        _sessionID = connectionId;
+        _connectionID = connectionId;
 
         _actor = new AMQPConnectionActor(this, 
virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
 
@@ -203,7 +204,7 @@ public class AMQProtocolEngine implement
 
     public long getSessionID()
     {
-        return _sessionID;
+        return _connectionID;
     }
 
     public LogActor getLogActor()
@@ -969,11 +970,6 @@ public class AMQProtocolEngine implement
         return getMethodRegistry();
     }
 
-    public Object getClientIdentifier()
-    {
-        return _network.getRemoteAddress();
-    }
-
     public VirtualHost getVirtualHost()
     {
         return _virtualHost;
@@ -1464,4 +1460,8 @@ public class AMQProtocolEngine implement
 
     }
 
+    public Object getReference()
+    {
+        return _reference;
+    }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1243379&r1=1243378&r2=1243379&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 Mon Feb 13 00:30:34 2012
@@ -172,7 +172,7 @@ public interface AMQProtocolSession exte
 
     void setClientProperties(FieldTable clientProperties);
 
-    Object getClientIdentifier();
+    Object getReference();
 
     VirtualHost getVirtualHost();
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1243379&r1=1243378&r2=1243379&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 Mon Feb 13 00:30:34 2012
@@ -475,10 +475,6 @@ public abstract class SubscriptionImpl i
 
     public boolean hasInterest(QueueEntry entry)
     {
-
-
-
-
         //check that the message hasn't been rejected
         if (entry.isRejectedBy(getSubscriptionID()))
         {
@@ -490,22 +486,17 @@ public abstract class SubscriptionImpl i
 
         if (_noLocal)
         {
-
             AMQMessage message = (AMQMessage) entry.getMessage();
 
-            //todo - client id should be recorded so we don't have to handle
-            // the case where this is null.
-            final Object publisher = message.getPublisherIdentifier();
+            final Object publisherReference = 
message.getConnectionIdentifier();
 
             // We don't want local messages so check to see if message is one 
we sent
-            Object localInstance = getProtocolSession();
+            Object localReference = getProtocolSession().getReference();
 
-            if(publisher.equals(localInstance))
+            if(publisherReference != null && 
publisherReference.equals(localReference))
             {
                 return false;
             }
-
-
         }
 
 

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java?rev=1243379&r1=1243378&r2=1243379&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
 Mon Feb 13 00:30:34 2012
@@ -20,14 +20,8 @@
  */
 package org.apache.qpid.server.persistent;
 
-import org.apache.commons.configuration.XMLConfiguration;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.server.store.DerbyMessageStore;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -36,60 +30,28 @@ import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 /**
- * QPID-1813 : We do not store the client id with a message so on store restart
- * that information is lost and we are unable to perform no local checks.
- *
- * QPID-1813 highlights the lack of testing here as the broker will NPE as it
- * assumes that the client id of the publisher will always exist
+ * Verifies that after recovery, a new Connection with no-local in use is
+ * able to receive messages sent prior to the broker restart.
  */
-public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements 
ConnectionListener
+public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase
 {
     protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName();
     protected static final int SEND_COUNT = 10;
-    private CountDownLatch _failoverComplete = new CountDownLatch(1);
-
-    protected ConnectionURL _connectionURL;
-
-    @Override
-    protected void setUp() throws Exception
-    {
-
-        XMLConfiguration configuration = new XMLConfiguration(_configFile);
-        configuration.setProperty("virtualhosts.virtualhost.test.store.class", 
"org.apache.qpid.server.store.DerbyMessageStore");
-        configuration.setProperty("virtualhosts.virtualhost.test.store."+ 
DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY,
-                                  System.getProperty("QPID_WORK", 
System.getProperty("java.io.tmpdir")) + File.separator + 
"derbyDB-NoLocalAfterRecoveryTest");
-
-        File tmpFile = File.createTempFile("configFile", "test");
-        tmpFile.deleteOnExit();
-        configuration.save(tmpFile);
-
-        _configFile = tmpFile;
-        _connectionURL = getConnectionURL();
-
-        BrokerDetails details = _connectionURL.getBrokerDetails(0);
-
-        // This will attempt to failover for 3 seconds.
-        // Local testing suggests failover takes 2 seconds
-        details.setProperty(BrokerDetails.OPTIONS_RETRY, "10");
-        details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "500");
-
-        super.setUp();        
-    }
 
     public void test() throws Exception
     {
+        if(!isBrokerStorePersistent())
+        {
+            fail("This test requires a broker with a persistent store");
+        }
 
-        Connection connection = getConnection(_connectionURL);
+        Connection connection = getConnection();
         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-
-        Topic topic = (Topic) getInitialContext().lookup("topic");
+        Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
 
         TopicSubscriber noLocalSubscriber = session.
                 createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + 
"-NoLocal",
@@ -99,88 +61,40 @@ public class NoLocalAfterRecoveryTest ex
                 createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + 
"-Normal",
                                         null, false);
 
-        List<Message> sent = sendMessage(session, topic, SEND_COUNT);
-
-        session.commit();
-
-        assertEquals("Incorrect number of messages sent",
-                     SEND_COUNT, sent.size());
-
+        sendMessage(session, topic, SEND_COUNT);
 
         // Check messages can be received as expected.
         connection.start();
 
-        assertTrue("No Local Subscriber is not a no-local subscriber",
-                   noLocalSubscriber.getNoLocal());
-
-        assertFalse("Normal Subscriber is a no-local subscriber",
-                    normalSubscriber.getNoLocal());
-
-
         List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
         assertEquals("No Local Subscriber Received messages", 0, 
received.size());
 
         received = receiveMessage(normalSubscriber, SEND_COUNT);
         assertEquals("Normal Subscriber Received no messages",
                      SEND_COUNT, received.size());
+        session.commit();
+        connection.close();
 
-
-        ((AMQConnection)connection).setConnectionListener(this);
-
+        //We didn't receive the messages on the durable queue for the no-local 
subscriber
+        //so they are still on the broker. Restart the broker, prompting their 
recovery.
         restartBroker();
 
+        Connection connection2 = getConnection();
+        connection2.start();
 
-        //Await
-        if (!_failoverComplete.await(4000L, TimeUnit.MILLISECONDS))
-        {
-            fail("Failover Failed to compelete");
-        }
-
-        session.rollback();
-
-        //Failover will restablish our clients
-        assertTrue("No Local Subscriber is not a no-local subscriber",
-                   noLocalSubscriber.getNoLocal());
-
-        assertFalse("Normal Subscriber is a no-local subscriber",
-                    normalSubscriber.getNoLocal());
+        Session session2 = connection2.createSession(true, 
Session.SESSION_TRANSACTED);
+        Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
 
+        TopicSubscriber noLocalSubscriber2 = session2.
+                createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + 
"-NoLocal",
+                                        null, true);
 
-        // NOTE : here that the NO-local subscriber actually now gets ALL the
-        // messages as the connection has failed and they are consuming on a
-        // different connnection to the one that was published on.
-        received = receiveMessage(noLocalSubscriber, SEND_COUNT);
+        // The NO-local subscriber should now get ALL the messages
+        // as they are being consumed on a different connection to
+        // the one that they were published on.
+        received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
+        session2.commit();
         assertEquals("No Local Subscriber Received messages", SEND_COUNT, 
received.size());
-
-        received = receiveMessage(normalSubscriber, SEND_COUNT);
-        assertEquals("Normal Subscriber Received no messages",
-                     SEND_COUNT, received.size());
-
-        //leave the store in a clean state.
-        session.commit();
-    }
-
-    protected List<Message> assertReceiveMessage(MessageConsumer 
messageConsumer,
-                                                 int count) throws JMSException
-    {
-
-        List<Message> receivedMessages = new ArrayList<Message>(count);
-        for (int i = 0; i < count; i++)
-        {
-            Message received = messageConsumer.receive(1000);
-
-            if (received != null)
-            {
-                receivedMessages.add(received);
-            }
-            else
-            {
-                fail("Only "
-                     + receivedMessages.size() + "/" + count + " received.");
-            }
-        }
-
-        return receivedMessages;
     }
 
     protected List<Message> receiveMessage(MessageConsumer messageConsumer,
@@ -204,29 +118,4 @@ public class NoLocalAfterRecoveryTest ex
 
         return receivedMessages;
     }
-
-    public void bytesSent(long count)
-    {
-
-    }
-
-    public void bytesReceived(long count)
-    {
-
-    }
-
-    public boolean preFailover(boolean redirect)
-    {
-        return true;
-    }
-
-    public boolean preResubscribe()
-    {
-        return true;
-    }
-
-    public void failoverComplete()
-    {
-        _failoverComplete.countDown();
-    }
 }

Modified: qpid/trunk/qpid/java/test-profiles/JavaExcludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaExcludes?rev=1243379&r1=1243378&r2=1243379&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaExcludes Mon Feb 13 00:30:34 2012
@@ -28,9 +28,6 @@ org.apache.qpid.test.client.queue.QueueP
 //Moved from JavaStandaloneExcludes when it was removed
 ///////////////////////////////////////////////////////
 
-//QPID-1818, QPID-1821 : Client code path does not correctly restore a 
transacted session after failover.
-org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
-
 //XA functionality is not fully implemented yet
 org.apache.qpid.jms.xa.XAResourceTest#*
 

Modified: qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes?rev=1243379&r1=1243378&r2=1243379&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes Mon Feb 13 
00:30:34 2012
@@ -18,6 +18,7 @@
 //
 
 //These tests require a persistent store
+org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
 org.apache.qpid.server.store.PersistentStoreTest#*
 
org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to