Author: gtully
Date: Thu Mar 31 15:11:09 2011
New Revision: 1087330
URL: http://svn.apache.org/viewvc?rev=1087330&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3253 - Support Temporary Destinations
in a network without advisories. Allow the connection id generator prefix to be
set via a connection factory such that temp identies can be configured such
that they are suitable for inclusion in a network list of statically included
destintions. Allow auto recreation of temp destinations by a new producer and
tie lifecycle to the producers connection. This allows configurable support for
request reply with temps in a network with advisory support disabled
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
(with props)
Modified:
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Modified:
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
(original)
+++
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
Thu Mar 31 15:11:09 2011
@@ -30,8 +30,9 @@ public class CamelConnection extends Act
private CamelContext camelContext;
- protected CamelConnection(Transport transport, IdGenerator
clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
- super(transport, clientIdGenerator, factoryStats);
+ protected CamelConnection(Transport transport, IdGenerator
clientIdGenerator,
+ IdGenerator connectionIdGenerator, JMSStatsImpl
factoryStats) throws Exception {
+ super(transport, clientIdGenerator, connectionIdGenerator,
factoryStats);
}
public CamelContext getCamelContext() {
Modified:
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
(original)
+++
activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
Thu Mar 31 15:11:09 2011
@@ -45,7 +45,7 @@ public class CamelConnectionFactory exte
// Implementation methods
//-----------------------------------------------------------------------
protected CamelConnection createActiveMQConnection(Transport transport,
JMSStatsImpl stats) throws Exception {
- CamelConnection connection = new CamelConnection(transport,
getClientIdGenerator(), stats);
+ CamelConnection connection = new CamelConnection(transport,
getClientIdGenerator(), getConnectionIdGenerator(), stats);
CamelContext context = getCamelContext();
if (context != null) {
connection.setCamelContext(context);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Mar 31 15:11:09 2011
@@ -200,7 +200,7 @@ public class ActiveMQConnection implemen
* @param factoryStats
* @throws Exception
*/
- protected ActiveMQConnection(final Transport transport, IdGenerator
clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
+ protected ActiveMQConnection(final Transport transport, IdGenerator
clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl
factoryStats) throws Exception {
this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
@@ -216,7 +216,7 @@ public class ActiveMQConnection implemen
}
});
// asyncConnectionThread.allowCoreThreadTimeOut(true);
- String uniqueId = CONNECTION_ID_GENERATOR.generateId();
+ String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo(new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant(transport.isFaultTolerant());
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Thu Mar 31 15:11:09 2011
@@ -83,6 +83,8 @@ public class ActiveMQConnectionFactory e
private IdGenerator clientIdGenerator;
private String clientIDPrefix;
+ private IdGenerator connectionIdGenerator;
+ private String connectionIDPrefix;
// client policies
private ActiveMQPrefetchPolicy prefetchPolicy = new
ActiveMQPrefetchPolicy();
@@ -288,7 +290,8 @@ public class ActiveMQConnectionFactory e
}
protected ActiveMQConnection createActiveMQConnection(Transport transport,
JMSStatsImpl stats) throws Exception {
- ActiveMQConnection connection = new ActiveMQConnection(transport,
getClientIdGenerator(), stats);
+ ActiveMQConnection connection = new ActiveMQConnection(transport,
getClientIdGenerator(),
+ getConnectionIdGenerator(), stats);
return connection;
}
@@ -844,6 +847,29 @@ public class ActiveMQConnectionFactory e
}
/**
+ * Sets the prefix used by connection id generator
+ * @param connectionIDPrefix
+ */
+ public void setConnectionIDPrefix(String connectionIDPrefix) {
+ this.connectionIDPrefix = connectionIDPrefix;
+ }
+
+ protected synchronized IdGenerator getConnectionIdGenerator() {
+ if (connectionIdGenerator == null) {
+ if (connectionIDPrefix != null) {
+ connectionIdGenerator = new IdGenerator(connectionIDPrefix);
+ } else {
+ connectionIdGenerator = new IdGenerator();
+ }
+ }
+ return connectionIdGenerator;
+ }
+
+ protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator)
{
+ this.connectionIdGenerator = connectionIdGenerator;
+ }
+
+ /**
* @return the statsEnabled
*/
public boolean isStatsEnabled() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
Thu Mar 31 15:11:09 2011
@@ -50,8 +50,9 @@ import org.apache.activemq.util.IdGenera
*/
public class ActiveMQXAConnection extends ActiveMQConnection implements
XATopicConnection, XAQueueConnection, XAConnection {
- protected ActiveMQXAConnection(Transport transport, IdGenerator
clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
- super(transport, clientIdGenerator, factoryStats);
+ protected ActiveMQXAConnection(Transport transport, IdGenerator
clientIdGenerator,
+ IdGenerator connectionIdGenerator,
JMSStatsImpl factoryStats) throws Exception {
+ super(transport, clientIdGenerator, connectionIdGenerator,
factoryStats);
}
public XASession createXASession() throws JMSException {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
Thu Mar 31 15:11:09 2011
@@ -80,7 +80,7 @@ public class ActiveMQXAConnectionFactory
}
protected ActiveMQConnection createActiveMQConnection(Transport transport,
JMSStatsImpl stats) throws Exception {
- ActiveMQXAConnection connection = new ActiveMQXAConnection(transport,
getClientIdGenerator(), stats);
+ ActiveMQXAConnection connection = new ActiveMQXAConnection(transport,
getClientIdGenerator(), getConnectionIdGenerator(), stats);
return connection;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Mar 31 15:11:09 2011
@@ -297,10 +297,6 @@ public class BrokerService implements Se
* @throws Exception
*/
public NetworkConnector addNetworkConnector(URI discoveryAddress) throws
Exception {
- if (!isAdvisorySupport()) {
- throw new javax.jms.IllegalStateException(
- "Networks require advisory messages to function -
advisories are currently disabled");
- }
NetworkConnector connector = new
DiscoveryNetworkConnector(discoveryAddress);
return addNetworkConnector(connector);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Thu Mar 31 15:11:09 2011
@@ -28,10 +28,12 @@ import org.apache.activemq.broker.Connec
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException;
import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
@@ -130,6 +132,18 @@ public abstract class AbstractRegion imp
destinations.put(destination, dest);
destinationMap.put(destination, dest);
addSubscriptionsForDestination(context, dest);
+ if (destination.isTemporary()) {
+ // need to associate with the connection so it can get
removed
+ if (context.getConnection() instanceof
TransportConnection) {
+ TransportConnection transportConnection =
(TransportConnection) context.getConnection();
+ DestinationInfo info = new
DestinationInfo(context.getConnectionId(),
+ DestinationInfo.ADD_OPERATION_TYPE,
+ destination);
+ transportConnection.processAddDestination(info);
+ LOG.debug("assigning ownership of auto created
temp : " + destination + " to connection:"
+ + context.getConnectionId());
+ }
+ }
}
if (dest == null) {
throw new JMSException("The destination " + destination +
" does not exist.");
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Mar 31 15:11:09 2011
@@ -834,7 +834,7 @@ public class Queue extends BaseDestinati
}finally {
messagesLock.readLock().unlock();
}
- return "Queue: destination=" + destination.getPhysicalName() + ",
subscriptions=" + consumers.size()
+ return destination.getQualifiedName() + ", subscriptions=" +
consumers.size()
+ ", memory=" + memoryUsage.getPercentUsage() + "%, size=" +
size + ", in flight groups="
+ messageGroupOwners;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Thu Mar 31 15:11:09 2011
@@ -393,7 +393,7 @@ public class RegionBroker extends EmptyB
// This seems to cause the destination to be added but without
// advisories firing...
- context.getBroker().addDestination(context, destination,
false);
+ context.getBroker().addDestination(context, destination, true);
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
Thu Mar 31 15:11:09 2011
@@ -20,6 +20,7 @@ import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
/**
+ * @org.apache.xbean.XBean element="tempQueue" description="An ActiveMQ
Temporary Queue Destination"
* @openwire:marshaller code="102"
*
*/
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
Thu Mar 31 15:11:09 2011
@@ -20,6 +20,7 @@ import javax.jms.JMSException;
import javax.jms.TemporaryTopic;
/**
+ * @org.apache.xbean.XBean element="tempTopic" description="An ActiveMQ
Temporary Topic Destination"
* @openwire:marshaller code="103"
*
*/
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Thu Mar 31 15:11:09 2011
@@ -127,19 +127,21 @@ public class SimpleDiscoveryAgent implem
event.connectFailures++;
if (maxReconnectAttempts > 0 && event.connectFailures
>= maxReconnectAttempts) {
- LOG.debug("Reconnect attempts exceeded
"+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
+ LOG.warn("Reconnect attempts exceeded
"+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
return;
}
synchronized (sleepMutex) {
try {
if (!running.get()) {
+ LOG.debug("Reconnecting disabled:
stopped");
return;
}
LOG.debug("Waiting "+event.reconnectDelay+" ms
before attempting to reconnect.");
sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) {
+ LOG.debug("Reconnecting disabled: " + ie);
Thread.currentThread().interrupt();
return;
}
@@ -161,6 +163,7 @@ public class SimpleDiscoveryAgent implem
}
if (!running.get()) {
+ LOG.debug("Reconnecting disabled: stopped");
return;
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Thu Mar 31 15:11:09 2011
@@ -52,11 +52,14 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
/**
@@ -66,6 +69,7 @@ import org.springframework.core.io.Resou
*
*/
public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(JmsMultipleBrokersTestSupport.class);
public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
public static int maxSetupTime = 5000;
@@ -170,7 +174,14 @@ public class JmsMultipleBrokersTestSuppo
if (!broker.getNetworkConnectors().isEmpty()) {
result = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
- return
(broker.getNetworkConnectors().get(bridgeIndex).activeBridges().size() >= min);
+ int activeCount = 0;
+ for (NetworkBridge bridge :
broker.getNetworkConnectors().get(bridgeIndex).activeBridges()) {
+ if (bridge.getRemoteBrokerName() != null) {
+ LOG.info("found bridge to " +
bridge.getRemoteBrokerName() + " on broker :" + broker.getBrokerName());
+ activeCount++;
+ }
+ }
+ return activeCount >= min;
}}, wait);
}
return result;
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java?rev=1087330&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
Thu Mar 31 15:11:09 2011
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import java.net.URLStreamHandlerFactory;
+import java.util.Map;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RequestReplyNoAdvisoryNetworkTest extends
JmsMultipleBrokersTestSupport {
+ private static final transient Logger LOG =
LoggerFactory.getLogger(RequestReplyNoAdvisoryNetworkTest.class);
+
+ BrokerService a, b;
+ ActiveMQQueue sendQ = new ActiveMQQueue("sendQ");
+ static final String connectionIdMarker = "ID:marker.";
+ ActiveMQTempQueue replyQWildcard = new
ActiveMQTempQueue(connectionIdMarker + ">");
+ private long receiveTimeout = 30000;
+
+ public void testNonAdvisoryNetworkRequestReplyXmlConfig() throws Exception
{
+ final String xmlConfigString = new String(
+ "<beans" +
+ " xmlns=\"http://www.springframework.org/schema/beans\"" +
+ " xmlns:amq=\"http://activemq.apache.org/schema/core\"" +
+ " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"" +
+ "
xsi:schemaLocation=\"http://www.springframework.org/schema/beans" +
+ "
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd" +
+ " http://activemq.apache.org/schema/core" +
+ " http://activemq.apache.org/schema/core/activemq-core.xsd\">"
+
+ " <broker xmlns=\"http://activemq.apache.org/schema/core\"
id=\"broker\"" +
+ " brokerName=\"%HOST%\" persistent=\"false\"
advisorySupport=\"false\" useJmx=\"false\" >" +
+ " <destinationPolicy>" +
+ " <policyMap>" +
+ " <policyEntries>" +
+ " <policyEntry optimizedDispatch=\"true\">"+
+ " <destination>"+
+ " <tempQueue physicalName=\"" +
replyQWildcard.getPhysicalName() + "\"/>" +
+ " </destination>" +
+ " </policyEntry>" +
+ " </policyEntries>" +
+ " </policyMap>" +
+ " </destinationPolicy>" +
+ " <networkConnectors>" +
+ " <networkConnector uri=\"multicast://default\">" +
+ " <staticallyIncludedDestinations>" +
+ " <queue physicalName=\"" + sendQ.getPhysicalName() +
"\"/>" +
+ " <tempQueue physicalName=\"" +
replyQWildcard.getPhysicalName() + "\"/>" +
+ " </staticallyIncludedDestinations>" +
+ " </networkConnector>" +
+ " </networkConnectors>" +
+ " <transportConnectors>" +
+ " <transportConnector uri=\"tcp://0.0.0.0:0\"
discoveryUri=\"multicast://default\" />" +
+ " </transportConnectors>" +
+ " </broker>" +
+ "</beans>");
+ final String localProtocolScheme = "inline";
+ URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory() {
+ @Override
+ public URLStreamHandler createURLStreamHandler(String protocol) {
+ if (localProtocolScheme.equalsIgnoreCase(protocol)) {
+ return new URLStreamHandler() {
+ @Override
+ protected URLConnection openConnection(URL u) throws
IOException {
+ return new URLConnection(u) {
+ @Override
+ public void connect() throws IOException {
+ }
+ @Override
+ public InputStream getInputStream() throws
IOException {
+ return new
ByteArrayInputStream(xmlConfigString.replace("%HOST%",
url.getFile()).getBytes("UTF-8"));
+ }
+ };
+ }
+ };
+ }
+ return null;
+ }
+ });
+ a = new XBeanBrokerFactory().createBroker(new URI("xbean:" +
localProtocolScheme + ":A"));
+ b = new XBeanBrokerFactory().createBroker(new URI("xbean:" +
localProtocolScheme + ":B"));
+
+ doTestNonAdvisoryNetworkRequestReply();
+ }
+
+ public void testNonAdvisoryNetworkRequestReply() throws Exception {
+ createBridgeAndStartBrokers();
+ doTestNonAdvisoryNetworkRequestReply();
+ }
+
+ public void doTestNonAdvisoryNetworkRequestReply() throws Exception {
+
+ waitForBridgeFormation(a, 1, 0);
+ waitForBridgeFormation(b, 1, 0);
+
+ ActiveMQConnectionFactory sendFactory = createConnectionFactory(a);
+ ActiveMQConnection sendConnection = createConnection(sendFactory);
+
+ ActiveMQSession sendSession =
(ActiveMQSession)sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(sendQ);
+ ActiveMQTempQueue realReplyQ = (ActiveMQTempQueue)
sendSession.createTemporaryQueue();
+ TextMessage message = sendSession.createTextMessage("1");
+ message.setJMSReplyTo(realReplyQ);
+ producer.send(message);
+
+ // responder
+ ActiveMQConnectionFactory consumerFactory = createConnectionFactory(b);
+ ActiveMQConnection consumerConnection =
createConnection(consumerFactory);
+
+ ActiveMQSession consumerSession =
(ActiveMQSession)consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(sendQ);
+ TextMessage received = (TextMessage) consumer.receive(receiveTimeout);
+ assertNotNull(received);
+
+ LOG.info("got request, sending reply");
+
+ MessageProducer consumerProducer =
consumerSession.createProducer(received.getJMSReplyTo());
+ consumerProducer.send(consumerSession.createTextMessage("got " +
received.getText()));
+ // temp dest on reply broker tied to this connection,
setOptimizedDispatch=true ensures
+ // message gets delivered before destination is removed
+ consumerConnection.close();
+
+ // reply consumer
+ MessageConsumer replyConsumer = sendSession.createConsumer(realReplyQ);
+ TextMessage reply = (TextMessage)
replyConsumer.receive(receiveTimeout);
+ assertNotNull("expected reply message", reply);
+ assertEquals("text is as expected", "got 1", reply.getText());
+ sendConnection.close();
+
+ verifyAllTempQueuesAreGone();
+ }
+
+ private void verifyAllTempQueuesAreGone() throws Exception {
+ for (BrokerService brokerService : new BrokerService[]{a, b}) {
+ RegionBroker regionBroker = (RegionBroker)
brokerService.getRegionBroker();
+ Map temps = regionBroker.getTempTopicRegion().getDestinationMap();
+ assertTrue("no temp topics on " + brokerService + ", " + temps,
temps.isEmpty());
+ temps = regionBroker.getTempQueueRegion().getDestinationMap();
+ assertTrue("no temp queues on " + brokerService + ", " + temps,
temps.isEmpty());
+ }
+ }
+
+ private ActiveMQConnection createConnection(ActiveMQConnectionFactory
factory) throws Exception {
+ ActiveMQConnection c =(ActiveMQConnection) factory.createConnection();
+ c.start();
+ return c;
+ }
+
+ private ActiveMQConnectionFactory createConnectionFactory(BrokerService
brokerService) throws Exception {
+ String target =
brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+ ActiveMQConnectionFactory factory =
+ new ActiveMQConnectionFactory(target);
+ factory.setWatchTopicAdvisories(false);
+ factory.setConnectionIDPrefix(connectionIdMarker +
brokerService.getBrokerName());
+ return factory;
+ }
+
+ public void createBridgeAndStartBrokers() throws Exception {
+ a = configureBroker("A");
+ b = configureBroker("B");
+ bridge(a, b);
+ bridge(b, a);
+ a.start();
+ b.start();
+ }
+
+ public void tearDown() throws Exception {
+ stop(a);
+ stop(b);
+ }
+
+ private void stop(BrokerService broker) throws Exception {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ private void bridge(BrokerService from, BrokerService to) throws Exception
{
+ TransportConnector toConnector = to.addConnector("tcp://localhost:0");
+ NetworkConnector bridge =
+ from.addNetworkConnector("static://" +
toConnector.getPublishableConnectString());
+ bridge.addStaticallyIncludedDestination(sendQ);
+ bridge.addStaticallyIncludedDestination(replyQWildcard);
+ }
+
+ private BrokerService configureBroker(String brokerName) throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName(brokerName);
+ broker.setAdvisorySupport(false);
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+
+ PolicyMap map = new PolicyMap();
+ PolicyEntry tempReplyQPolicy = new PolicyEntry();
+ tempReplyQPolicy.setOptimizedDispatch(true);
+ map.put(replyQWildcard, tempReplyQPolicy);
+ broker.setDestinationPolicy(map);
+ return broker;
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date