Author: rgodfrey
Date: Tue Apr 17 20:09:37 2012
New Revision: 1327268

URL: http://svn.apache.org/viewvc?rev=1327268&view=rev
Log:
QPID-3953 : [Java AMQP 1-0] Fix durable subscribers

Modified:
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1327268&r1=1327267&r2=1327268&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
 Tue Apr 17 20:09:37 2012
@@ -20,16 +20,14 @@
  */
 package org.apache.qpid.amqp_1_0.jms.impl;
 
-import org.apache.qpid.amqp_1_0.jms.Connection;
-import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
-
+import java.net.MalformedURLException;
+import java.net.URL;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
 import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
-import java.net.MalformedURLException;
-import java.net.URL;
+import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
 
 public class ConnectionFactoryImpl implements ConnectionFactory, 
TopicConnectionFactory, QueueConnectionFactory
 {
@@ -45,6 +43,14 @@ public class ConnectionFactoryImpl imple
     public ConnectionFactoryImpl(final String host,
                                  final int port,
                                  final String username,
+                                 final String password)
+    {
+        this(host,port,username,password,null,false);
+    }
+
+    public ConnectionFactoryImpl(final String host,
+                                 final int port,
+                                 final String username,
                                  final String password,
                                  final String clientId)
     {

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java?rev=1327268&r1=1327267&r2=1327268&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
 Tue Apr 17 20:09:37 2012
@@ -79,6 +79,14 @@ public class TopicSubscriberImpl extends
             if(!address.equals(actualAddress) || !filtersEqual(getFilters(), 
actualFilters))
             {
                 receiver.close();
+                if(isDurable())
+                {
+                    receiver = 
getSession().getClientSession().createReceiver(address,
+                            StdDistMode.COPY, AcknowledgeMode.ALO,
+                            getLinkName(), false, getFilters(),
+                            null);
+                    receiver.close();
+                }
                 receiver = 
getSession().getClientSession().createReceiver(address,
                                                                           
StdDistMode.COPY, AcknowledgeMode.ALO,
                                                                           
getLinkName(), isDurable(), getFilters(),

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java?rev=1327268&r1=1327267&r2=1327268&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java
 Tue Apr 17 20:09:37 2012
@@ -24,10 +24,13 @@ package org.apache.qpid.amqp_1_0.transpo
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class Container
 {
 
+    private static final AtomicInteger CONTAINER_ID = new AtomicInteger(0);
+
     private String _id;
 
     public Container()
@@ -57,7 +60,7 @@ public class Container
             pid = "unknown";
         }
 
-        _id = hostname + '(' + pid + ')';
+        _id = hostname + '(' + pid + ')' + ':' + 
CONTAINER_ID.incrementAndGet();
 
     }
 

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java?rev=1327268&r1=1327267&r2=1327268&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/security/SaslMechanisms.java
 Tue Apr 17 20:09:37 2012
@@ -24,10 +24,10 @@
 package org.apache.qpid.amqp_1_0.type.security;
 
 
+import java.util.Arrays;
 import org.apache.qpid.amqp_1_0.transport.SASLEndpoint;
-
-
-import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.apache.qpid.amqp_1_0.type.Symbol;
 
 public class SaslMechanisms
   implements SaslFrameBody
@@ -58,7 +58,7 @@ public class SaslMechanisms
             {
                 builder.append(',');
             }
-            
builder.append("saslServerMechanisms=").append(_saslServerMechanisms);
+            
builder.append("saslServerMechanisms=").append(_saslServerMechanisms == null ? 
"" : Arrays.asList(_saslServerMechanisms));
         }
 
         builder.append('}');

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java?rev=1327268&r1=1327267&r2=1327268&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
 Tue Apr 17 20:09:37 2012
@@ -20,31 +20,32 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.security.auth.callback.CallbackHandler;
 import org.apache.qpid.amqp_1_0.codec.FrameWriter;
 import org.apache.qpid.amqp_1_0.framing.AMQFrame;
 import org.apache.qpid.amqp_1_0.framing.FrameHandler;
 import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.transport.*;
+import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.FrameBody;
-
 import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
 import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
 
-import javax.security.auth.callback.CallbackHandler;
-import java.io.PrintWriter;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.UUID;
-import java.util.logging.*;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, 
FrameOutputHandler
 {
     static final AtomicLong _connectionIdSource = new AtomicLong(0L);
@@ -94,6 +95,8 @@ public class ProtocolEngine_1_0_0 implem
     }
 
     private State _state = State.A;
+    
+
 
     public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry, long 
id)
     {
@@ -138,7 +141,7 @@ public class ProtocolEngine_1_0_0 implem
         _network = network;
         _sender = sender;
 
-        Container container = new Container();
+        Container container = new 
Container(_appRegistry.getBrokerId().toString());
 
         _conn = new 
ConnectionEndpoint(container,asCallbackHandlerSource(_appRegistry.getAuthenticationManager()));
         _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java?rev=1327268&r1=1327267&r2=1327268&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
 Tue Apr 17 20:09:37 2012
@@ -20,10 +20,16 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.io.PrintWriter;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.security.auth.callback.CallbackHandler;
 import org.apache.qpid.amqp_1_0.codec.FrameWriter;
 import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
 import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.framing.FrameHandler;
 import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
 import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
 import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
@@ -32,7 +38,6 @@ import org.apache.qpid.amqp_1_0.transpor
 import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.FrameBody;
-
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConnectionConfigType;
@@ -43,15 +48,6 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
 
-import javax.security.auth.callback.CallbackHandler;
-import java.io.PrintWriter;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.UUID;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
 public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, 
FrameOutputHandler
 {
        private long _readBytes;
@@ -165,7 +161,7 @@ public class ProtocolEngine_1_0_0_SASL i
         _network = network;
         _sender = sender;
 
-        Container container = new Container();
+        Container container = new 
Container(_appRegistry.getBrokerId().toString());
 
         _conn = new ConnectionEndpoint(container, 
asCallbackHandlerSource(ApplicationRegistry.getInstance()
                                                                                
              .getAuthenticationManager()));

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1327268&r1=1327267&r2=1327268&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
 Tue Apr 17 20:09:37 2012
@@ -347,10 +347,8 @@ public class SendingLink_1_0 implements 
     {
         //TODO
         // if not durable or close
-        if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
-           (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+        if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
         {
-
             AMQQueue queue = _subscription.getQueue();
 
             try

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1327268&r1=1327267&r2=1327268&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Tue Apr 17 20:09:37 2012
@@ -24,11 +24,10 @@ import org.apache.qpid.amqp_1_0.transpor
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
-import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.*;
 import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
 import org.apache.qpid.amqp_1_0.type.transaction.Coordinator;
 import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
 import org.apache.qpid.amqp_1_0.type.transport.*;
@@ -143,13 +142,6 @@ public class Session_1_0 implements Sess
                         
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
                         {
                             
linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
-                            sendingLink.setCloseAction(new Runnable() {
-
-                                public void run()
-                                {
-                                    
linkRegistry.unregisterSendingLink(endpoint.getName());
-                                }
-                            });
                         }
                     }
                     catch(AmqpErrorException e)
@@ -163,7 +155,19 @@ public class Session_1_0 implements Sess
             }
             else
             {
-                endpoint.setSource(previousLink.getEndpoint().getSource());
+                Source newSource = (Source) endpoint.getSource();
+
+                Source oldSource = (Source) 
previousLink.getEndpoint().getSource();
+                final TerminusDurability newSourceDurable = newSource == null 
? null : newSource.getDurable();
+                if(newSourceDurable != null)
+                {
+                    oldSource.setDurable(newSourceDurable);
+                    if(newSourceDurable.equals(TerminusDurability.NONE))
+                    {
+                        linkRegistry.unregisterSendingLink(endpoint.getName());
+                    }
+                }
+                endpoint.setSource(oldSource);
                 SendingLinkEndpoint sendingLinkEndpoint = 
(SendingLinkEndpoint) endpoint;
                 previousLink.setLinkAttachment(new SendingLinkAttachment(this, 
sendingLinkEndpoint));
                 sendingLinkEndpoint.setLinkEventListener(previousLink);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to