Author: gtully
Date: Thu Mar 31 15:11:09 2011
New Revision: 1087330

URL: http://svn.apache.org/viewvc?rev=1087330&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3253 - Support Temporary Destinations 
in a network without advisories. Allow the connection id generator prefix to be 
set via a connection factory such that temp identies can be configured such 
that they are suitable for inclusion in a network list of statically included 
destintions. Allow auto recreation of temp destinations by a new producer and 
tie lifecycle to the producers connection. This allows configurable support for 
request reply with temps in a network with advisory support disabled

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
   (with props)
Modified:
    
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
    
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java

Modified: 
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
 (original)
+++ 
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
 Thu Mar 31 15:11:09 2011
@@ -30,8 +30,9 @@ public class CamelConnection extends Act
 
     private CamelContext camelContext;
 
-    protected CamelConnection(Transport transport, IdGenerator 
clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
-        super(transport, clientIdGenerator, factoryStats);
+    protected CamelConnection(Transport transport, IdGenerator 
clientIdGenerator,
+                              IdGenerator connectionIdGenerator, JMSStatsImpl 
factoryStats) throws Exception {
+        super(transport, clientIdGenerator, connectionIdGenerator, 
factoryStats);
     }
 
     public CamelContext getCamelContext() {

Modified: 
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
 (original)
+++ 
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
 Thu Mar 31 15:11:09 2011
@@ -45,7 +45,7 @@ public class CamelConnectionFactory exte
     // Implementation methods
     //-----------------------------------------------------------------------
     protected CamelConnection createActiveMQConnection(Transport transport, 
JMSStatsImpl stats) throws Exception {
-        CamelConnection connection = new CamelConnection(transport, 
getClientIdGenerator(), stats);
+        CamelConnection connection = new CamelConnection(transport, 
getClientIdGenerator(), getConnectionIdGenerator(), stats);
         CamelContext context = getCamelContext();
         if (context != null) {
             connection.setCamelContext(context);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
 Thu Mar 31 15:11:09 2011
@@ -200,7 +200,7 @@ public class ActiveMQConnection implemen
      * @param factoryStats
      * @throws Exception
      */
-    protected ActiveMQConnection(final Transport transport, IdGenerator 
clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
+    protected ActiveMQConnection(final Transport transport, IdGenerator 
clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl 
factoryStats) throws Exception {
 
         this.transport = transport;
         this.clientIdGenerator = clientIdGenerator;
@@ -216,7 +216,7 @@ public class ActiveMQConnection implemen
             }
         });
         // asyncConnectionThread.allowCoreThreadTimeOut(true);
-        String uniqueId = CONNECTION_ID_GENERATOR.generateId();
+        String uniqueId = connectionIdGenerator.generateId();
         this.info = new ConnectionInfo(new ConnectionId(uniqueId));
         this.info.setManageable(true);
         this.info.setFaultTolerant(transport.isFaultTolerant());

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
 Thu Mar 31 15:11:09 2011
@@ -83,6 +83,8 @@ public class ActiveMQConnectionFactory e
 
     private IdGenerator clientIdGenerator;
     private String clientIDPrefix;
+    private IdGenerator connectionIdGenerator;
+    private String connectionIDPrefix;
 
     // client policies
     private ActiveMQPrefetchPolicy prefetchPolicy = new 
ActiveMQPrefetchPolicy();
@@ -288,7 +290,8 @@ public class ActiveMQConnectionFactory e
     }
 
     protected ActiveMQConnection createActiveMQConnection(Transport transport, 
JMSStatsImpl stats) throws Exception {
-        ActiveMQConnection connection = new ActiveMQConnection(transport, 
getClientIdGenerator(), stats);
+        ActiveMQConnection connection = new ActiveMQConnection(transport, 
getClientIdGenerator(),
+                getConnectionIdGenerator(), stats);
         return connection;
     }
 
@@ -844,6 +847,29 @@ public class ActiveMQConnectionFactory e
     }
 
     /**
+     * Sets the prefix used by connection id generator
+     * @param connectionIDPrefix
+     */
+    public void setConnectionIDPrefix(String connectionIDPrefix) {
+        this.connectionIDPrefix = connectionIDPrefix;
+    }
+
+    protected synchronized IdGenerator getConnectionIdGenerator() {
+        if (connectionIdGenerator == null) {
+            if (connectionIDPrefix != null) {
+                connectionIdGenerator = new IdGenerator(connectionIDPrefix);
+            } else {
+                connectionIdGenerator = new IdGenerator();
+            }
+        }
+        return connectionIdGenerator;
+    }
+
+    protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) 
{
+        this.connectionIdGenerator = connectionIdGenerator;
+    }
+
+    /**
      * @return the statsEnabled
      */
     public boolean isStatsEnabled() {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
 Thu Mar 31 15:11:09 2011
@@ -50,8 +50,9 @@ import org.apache.activemq.util.IdGenera
  */
 public class ActiveMQXAConnection extends ActiveMQConnection implements 
XATopicConnection, XAQueueConnection, XAConnection {
 
-    protected ActiveMQXAConnection(Transport transport, IdGenerator 
clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
-        super(transport, clientIdGenerator, factoryStats);
+    protected ActiveMQXAConnection(Transport transport, IdGenerator 
clientIdGenerator,
+                                   IdGenerator connectionIdGenerator, 
JMSStatsImpl factoryStats) throws Exception {
+        super(transport, clientIdGenerator, connectionIdGenerator, 
factoryStats);
     }
 
     public XASession createXASession() throws JMSException {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
 Thu Mar 31 15:11:09 2011
@@ -80,7 +80,7 @@ public class ActiveMQXAConnectionFactory
     }
 
     protected ActiveMQConnection createActiveMQConnection(Transport transport, 
JMSStatsImpl stats) throws Exception {
-        ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, 
getClientIdGenerator(), stats);
+        ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, 
getClientIdGenerator(), getConnectionIdGenerator(), stats);
         return connection;
     }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 Thu Mar 31 15:11:09 2011
@@ -297,10 +297,6 @@ public class BrokerService implements Se
      * @throws Exception
      */
     public NetworkConnector addNetworkConnector(URI discoveryAddress) throws 
Exception {
-        if (!isAdvisorySupport()) {
-            throw new javax.jms.IllegalStateException(
-                    "Networks require advisory messages to function - 
advisories are currently disabled");
-        }
         NetworkConnector connector = new 
DiscoveryNetworkConnector(discoveryAddress);
         return addNetworkConnector(connector);
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 Thu Mar 31 15:11:09 2011
@@ -28,10 +28,12 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.DestinationAlreadyExistsException;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
@@ -130,6 +132,18 @@ public abstract class AbstractRegion imp
                     destinations.put(destination, dest);
                     destinationMap.put(destination, dest);
                     addSubscriptionsForDestination(context, dest);
+                    if (destination.isTemporary()) {
+                        // need to associate with the connection so it can get 
removed
+                        if (context.getConnection() instanceof 
TransportConnection) {
+                            TransportConnection transportConnection = 
(TransportConnection) context.getConnection();
+                            DestinationInfo info = new 
DestinationInfo(context.getConnectionId(),
+                                    DestinationInfo.ADD_OPERATION_TYPE,
+                                    destination);
+                            transportConnection.processAddDestination(info);
+                            LOG.debug("assigning ownership of auto created 
temp : " + destination + " to connection:"
+                                    + context.getConnectionId());
+                        }
+                    }
                 }
                 if (dest == null) {
                     throw new JMSException("The destination " + destination + 
" does not exist.");

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Thu Mar 31 15:11:09 2011
@@ -834,7 +834,7 @@ public class Queue extends BaseDestinati
         }finally {
             messagesLock.readLock().unlock();
         }
-        return "Queue: destination=" + destination.getPhysicalName() + ", 
subscriptions=" + consumers.size()
+        return destination.getQualifiedName() + ", subscriptions=" + 
consumers.size()
                 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + 
size + ", in flight groups="
                 + messageGroupOwners;
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 Thu Mar 31 15:11:09 2011
@@ -393,7 +393,7 @@ public class RegionBroker extends EmptyB
 
                 // This seems to cause the destination to be added but without
                 // advisories firing...
-                context.getBroker().addDestination(context, destination, 
false);
+                context.getBroker().addDestination(context, destination, true);
                 switch (destination.getDestinationType()) {
                 case ActiveMQDestination.QUEUE_TYPE:
                     queueRegion.addProducer(context, info);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
 Thu Mar 31 15:11:09 2011
@@ -20,6 +20,7 @@ import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
 /**
+ * @org.apache.xbean.XBean element="tempQueue" description="An ActiveMQ 
Temporary Queue Destination"
  * @openwire:marshaller code="102"
  * 
  */

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
 Thu Mar 31 15:11:09 2011
@@ -20,6 +20,7 @@ import javax.jms.JMSException;
 import javax.jms.TemporaryTopic;
 
 /**
+ * @org.apache.xbean.XBean element="tempTopic" description="An ActiveMQ 
Temporary Topic Destination"
  * @openwire:marshaller code="103"
  * 
  */

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
 Thu Mar 31 15:11:09 2011
@@ -127,19 +127,21 @@ public class SimpleDiscoveryAgent implem
                         event.connectFailures++;
 
                         if (maxReconnectAttempts > 0 && event.connectFailures 
>= maxReconnectAttempts) {
-                            LOG.debug("Reconnect attempts exceeded 
"+maxReconnectAttempts+" tries.  Reconnecting has been disabled.");
+                            LOG.warn("Reconnect attempts exceeded 
"+maxReconnectAttempts+" tries.  Reconnecting has been disabled.");
                             return;
                         }
 
                         synchronized (sleepMutex) {
                             try {
                                 if (!running.get()) {
+                                    LOG.debug("Reconnecting disabled: 
stopped");
                                     return;
                                 }
 
                                 LOG.debug("Waiting "+event.reconnectDelay+" ms 
before attempting to reconnect.");
                                 sleepMutex.wait(event.reconnectDelay);
                             } catch (InterruptedException ie) {
+                                LOG.debug("Reconnecting disabled: " + ie);
                                 Thread.currentThread().interrupt();
                                 return;
                             }
@@ -161,6 +163,7 @@ public class SimpleDiscoveryAgent implem
                     }
 
                     if (!running.get()) {
+                        LOG.debug("Reconnecting disabled: stopped");
                         return;
                     }
 

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
 Thu Mar 31 15:11:09 2011
@@ -52,11 +52,14 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.activemq.util.Wait;
 import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.core.io.Resource;
 
 /**
@@ -66,6 +69,7 @@ import org.springframework.core.io.Resou
  * 
  */
 public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JmsMultipleBrokersTestSupport.class);
     public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
     public static int maxSetupTime = 5000;
 
@@ -170,7 +174,14 @@ public class JmsMultipleBrokersTestSuppo
         if (!broker.getNetworkConnectors().isEmpty()) {
             result = Wait.waitFor(new Wait.Condition() {
                 public boolean isSatisified() throws Exception {
-                    return 
(broker.getNetworkConnectors().get(bridgeIndex).activeBridges().size() >= min);
+                    int activeCount = 0;
+                    for (NetworkBridge bridge : 
broker.getNetworkConnectors().get(bridgeIndex).activeBridges()) {
+                        if (bridge.getRemoteBrokerName() != null) {
+                            LOG.info("found bridge to " + 
bridge.getRemoteBrokerName() + " on broker :" + broker.getBrokerName());
+                            activeCount++;
+                        }
+                    }
+                    return activeCount >= min;
                 }}, wait);
         }
         return result;

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java?rev=1087330&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
 Thu Mar 31 15:11:09 2011
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import java.net.URLStreamHandlerFactory;
+import java.util.Map;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RequestReplyNoAdvisoryNetworkTest extends 
JmsMultipleBrokersTestSupport {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(RequestReplyNoAdvisoryNetworkTest.class);
+
+    BrokerService a, b;
+    ActiveMQQueue sendQ = new ActiveMQQueue("sendQ");
+    static final String connectionIdMarker = "ID:marker.";
+    ActiveMQTempQueue replyQWildcard = new 
ActiveMQTempQueue(connectionIdMarker + ">");
+    private long receiveTimeout = 30000;
+
+    public void testNonAdvisoryNetworkRequestReplyXmlConfig() throws Exception 
{
+        final String xmlConfigString = new String(
+                "<beans" +
+                " xmlns=\"http://www.springframework.org/schema/beans\""; +
+                " xmlns:amq=\"http://activemq.apache.org/schema/core\""; +
+                " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\""; +
+                " 
xsi:schemaLocation=\"http://www.springframework.org/schema/beans"; +
+                " 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"; +
+                " http://activemq.apache.org/schema/core"; +
+                " http://activemq.apache.org/schema/core/activemq-core.xsd\";>" 
+
+                "  <broker xmlns=\"http://activemq.apache.org/schema/core\"; 
id=\"broker\"" +
+                "    brokerName=\"%HOST%\" persistent=\"false\" 
advisorySupport=\"false\" useJmx=\"false\" >" +
+                "   <destinationPolicy>" +
+                "    <policyMap>" +
+                "     <policyEntries>" +
+                "      <policyEntry optimizedDispatch=\"true\">"+
+                "       <destination>"+
+                "        <tempQueue physicalName=\"" + 
replyQWildcard.getPhysicalName() + "\"/>" +
+                "       </destination>" +
+                "      </policyEntry>" +
+                "     </policyEntries>" +
+                "    </policyMap>" +
+                "   </destinationPolicy>" +
+                "   <networkConnectors>" +
+                "    <networkConnector uri=\"multicast://default\">" +
+                "     <staticallyIncludedDestinations>" +
+                "      <queue physicalName=\"" + sendQ.getPhysicalName() + 
"\"/>" +
+                "      <tempQueue physicalName=\"" + 
replyQWildcard.getPhysicalName() + "\"/>" +
+                "     </staticallyIncludedDestinations>" +
+                "    </networkConnector>" +
+                "   </networkConnectors>" +
+                "   <transportConnectors>" +
+                "     <transportConnector uri=\"tcp://0.0.0.0:0\" 
discoveryUri=\"multicast://default\" />" +
+                "   </transportConnectors>" +
+                "  </broker>" +
+                "</beans>");
+        final String localProtocolScheme = "inline";
+        URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory() {
+            @Override
+            public URLStreamHandler createURLStreamHandler(String protocol) {
+                if (localProtocolScheme.equalsIgnoreCase(protocol)) {
+                    return new URLStreamHandler() {
+                        @Override
+                        protected URLConnection openConnection(URL u) throws 
IOException {
+                            return new URLConnection(u) {
+                                @Override
+                                public void connect() throws IOException {
+                                }
+                                @Override
+                                public InputStream getInputStream() throws 
IOException {
+                                    return new 
ByteArrayInputStream(xmlConfigString.replace("%HOST%", 
url.getFile()).getBytes("UTF-8"));
+                                }
+                            };
+                        }
+                    };
+                }
+                return null;
+            }
+        });
+        a = new XBeanBrokerFactory().createBroker(new URI("xbean:" + 
localProtocolScheme + ":A"));
+        b = new XBeanBrokerFactory().createBroker(new URI("xbean:" + 
localProtocolScheme + ":B"));
+
+        doTestNonAdvisoryNetworkRequestReply();
+    }
+
+    public void testNonAdvisoryNetworkRequestReply() throws Exception {
+        createBridgeAndStartBrokers();
+        doTestNonAdvisoryNetworkRequestReply();
+    }
+
+    public void doTestNonAdvisoryNetworkRequestReply() throws Exception {
+
+        waitForBridgeFormation(a, 1, 0);
+        waitForBridgeFormation(b, 1, 0);
+
+        ActiveMQConnectionFactory sendFactory = createConnectionFactory(a);
+        ActiveMQConnection sendConnection = createConnection(sendFactory);
+
+        ActiveMQSession sendSession = 
(ActiveMQSession)sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = sendSession.createProducer(sendQ);
+        ActiveMQTempQueue realReplyQ = (ActiveMQTempQueue) 
sendSession.createTemporaryQueue();
+        TextMessage message = sendSession.createTextMessage("1");
+        message.setJMSReplyTo(realReplyQ);
+        producer.send(message);
+
+        // responder
+        ActiveMQConnectionFactory consumerFactory = createConnectionFactory(b);
+        ActiveMQConnection consumerConnection = 
createConnection(consumerFactory);
+
+        ActiveMQSession consumerSession = 
(ActiveMQSession)consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(sendQ);
+        TextMessage received = (TextMessage) consumer.receive(receiveTimeout);
+        assertNotNull(received);
+
+        LOG.info("got request, sending reply");
+
+        MessageProducer consumerProducer = 
consumerSession.createProducer(received.getJMSReplyTo());
+        consumerProducer.send(consumerSession.createTextMessage("got " + 
received.getText()));
+        // temp dest on reply broker tied to this connection, 
setOptimizedDispatch=true ensures
+        // message gets delivered before destination is removed
+        consumerConnection.close();
+
+        // reply consumer
+        MessageConsumer replyConsumer = sendSession.createConsumer(realReplyQ);
+        TextMessage reply = (TextMessage) 
replyConsumer.receive(receiveTimeout);
+        assertNotNull("expected reply message", reply);
+        assertEquals("text is as expected", "got 1", reply.getText());
+        sendConnection.close();
+
+        verifyAllTempQueuesAreGone();
+    }
+
+    private void verifyAllTempQueuesAreGone() throws Exception {
+        for (BrokerService brokerService : new BrokerService[]{a, b}) {
+            RegionBroker regionBroker = (RegionBroker) 
brokerService.getRegionBroker();
+            Map temps = regionBroker.getTempTopicRegion().getDestinationMap();
+            assertTrue("no temp topics on " + brokerService + ", " + temps, 
temps.isEmpty());
+            temps = regionBroker.getTempQueueRegion().getDestinationMap();
+            assertTrue("no temp queues on " + brokerService + ", " + temps, 
temps.isEmpty());
+        }
+    }
+
+    private ActiveMQConnection createConnection(ActiveMQConnectionFactory 
factory) throws Exception {
+        ActiveMQConnection c =(ActiveMQConnection) factory.createConnection();
+        c.start();
+        return c;
+    }
+
+    private ActiveMQConnectionFactory createConnectionFactory(BrokerService 
brokerService) throws Exception {
+        String target = 
brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+        ActiveMQConnectionFactory factory =
+                new ActiveMQConnectionFactory(target);
+        factory.setWatchTopicAdvisories(false);
+        factory.setConnectionIDPrefix(connectionIdMarker + 
brokerService.getBrokerName());
+        return factory;
+    }
+
+    public void createBridgeAndStartBrokers() throws Exception {
+        a = configureBroker("A");
+        b = configureBroker("B");
+        bridge(a, b);
+        bridge(b, a);
+        a.start();
+        b.start();
+    }
+
+    public void tearDown() throws Exception {
+        stop(a);
+        stop(b);
+    }
+
+    private void stop(BrokerService broker) throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private void bridge(BrokerService from, BrokerService to) throws Exception 
{
+        TransportConnector toConnector = to.addConnector("tcp://localhost:0");
+        NetworkConnector bridge =
+                from.addNetworkConnector("static://" + 
toConnector.getPublishableConnectString());
+        bridge.addStaticallyIncludedDestination(sendQ);
+        bridge.addStaticallyIncludedDestination(replyQWildcard);
+    }
+
+    private BrokerService configureBroker(String brokerName) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(brokerName);
+        broker.setAdvisorySupport(false);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+
+        PolicyMap map = new PolicyMap();
+        PolicyEntry tempReplyQPolicy = new PolicyEntry();
+        tempReplyQPolicy.setOptimizedDispatch(true);
+        map.put(replyQWildcard, tempReplyQPolicy);
+        broker.setDestinationPolicy(map);
+        return broker;
+    }
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to