Author: jlim
Date: Mon Mar 5 10:04:44 2007
New Revision: 514754
URL: http://svn.apache.org/viewvc?view=rev&rev=514754
Log:
ported fix to trunk :
http://issues.apache.org/activemq/browse/AMQ-1176
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=514754&r1=514753&r2=514754
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Mon Mar 5 10:04:44 2007
@@ -134,6 +134,8 @@
private boolean useRetroactiveConsumer;
private boolean alwaysSyncSend;
private int closeTimeout = 15000;
+ private boolean useSyncSend=false;
+ private boolean watchTopicAdvisories=true;
private final Transport transport;
private final IdGenerator clientIdGenerator;
@@ -1283,7 +1285,9 @@
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new
SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId());
- advisoryConsumer = new AdvisoryConsumer(this, consumerId);
+ if( watchTopicAdvisories ) {
+ advisoryConsumer = new AdvisoryConsumer(this, consumerId);
+ }
}
@@ -1293,6 +1297,21 @@
public boolean isUseAsyncSend() {
return useAsyncSend;
}
+
+ public void setUseSyncSend(boolean forceSyncSend) {
+ this.useSyncSend = forceSyncSend;
+ }
+
+
+ public synchronized boolean isWatchTopicAdvisories() {
+ return watchTopicAdvisories;
+ }
+
+
+ public synchronized void setWatchTopicAdvisories(boolean
watchTopicAdvisories) {
+ this.watchTopicAdvisories = watchTopicAdvisories;
+ }
+
/**
* Forces the use of <a
@@ -1647,7 +1666,15 @@
*/
public void deleteTempDestination(ActiveMQTempDestination destination)
throws JMSException {
- checkClosedOrFailed();
+ checkClosedOrFailed();
+
+ for(Iterator i=this.sessions.iterator();i.hasNext();){
+ ActiveMQSession s=(ActiveMQSession) i.next();
+ if( s.isInUse(destination) ) {
+ throw new JMSException("A consumer is consuming from the
temporary destination");
+ }
+ }
+
activeTempDestinations.remove(destination);
DestinationInfo info = new DestinationInfo();
@@ -1661,6 +1688,12 @@
public boolean isDeleted(ActiveMQDestination dest) {
+
+ // If we are not watching the advisories.. then
+ // we will assume that the temp destination does exist.
+ if( advisoryConsumer==null )
+ return false;
+
return !activeTempDestinations.contains(dest);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=514754&r1=514753&r2=514754
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Mon Mar 5 10:04:44 2007
@@ -88,6 +88,8 @@
private boolean nestedMapAndListEnabled = true;
JMSStatsImpl factoryStats = new JMSStatsImpl();
private boolean alwaysSyncSend;
+ private boolean useSyncSend=false;
+ private boolean watchTopicAdvisories=true;
static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new
ScheduledThreadPoolExecutor(5, new ThreadFactory() {
public Thread newThread(Runnable run) {
@@ -260,7 +262,8 @@
connection.setRedeliveryPolicy(getRedeliveryPolicy());
connection.setTransformer(getTransformer());
connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
-
+ connection.setWatchTopicAdvisories(watchTopicAdvisories);
+
transport.start();
if( clientID !=null )
@@ -431,6 +434,18 @@
this.useAsyncSend = useAsyncSend;
}
+ public void setUseSyncSend(boolean forceSyncSend) {
+ this.useSyncSend = forceSyncSend;
+ }
+
+ public synchronized boolean isWatchTopicAdvisories() {
+ return watchTopicAdvisories;
+ }
+
+ public synchronized void setWatchTopicAdvisories(boolean
watchTopicAdvisories) {
+ this.watchTopicAdvisories = watchTopicAdvisories;
+ }
+
/**
* @return true if always sync send messages
*/
@@ -564,7 +579,8 @@
props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
props.setProperty("useCompression",
Boolean.toString(isUseCompression()));
props.setProperty("useRetroactiveConsumer",
Boolean.toString(isUseRetroactiveConsumer()));
-
+ props.setProperty("watchTopicAdvisories",
Boolean.toString(isWatchTopicAdvisories()));
+
if (getUserName() != null) {
props.setProperty("userName", getUserName());
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=514754&r1=514753&r2=514754
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Mar 5 10:04:44 2007
@@ -940,5 +940,9 @@
}
return false;
}
+
+ public boolean isInUse(ActiveMQTempDestination destination) {
+ return info.getDestination().equals(destination);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=514754&r1=514753&r2=514754
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Mon Mar 5 10:04:44 2007
@@ -1831,6 +1831,14 @@
}
}
-
+ public boolean isInUse(ActiveMQTempDestination destination) {
+ for(Iterator iter=consumers.iterator();iter.hasNext();){
+ ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
+ if( c.isInUse(destination) ) {
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=514754&r1=514753&r2=514754
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Mon Mar 5 10:04:44 2007
@@ -65,7 +65,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
+
/**
* Routes Broker operations to the correct messaging regions for processing.
@@ -87,7 +87,7 @@
protected final DestinationStatistics destinationStatistics = new
DestinationStatistics();
private final CopyOnWriteArrayList connections = new
CopyOnWriteArrayList();
- private final CopyOnWriteArraySet destinations = new CopyOnWriteArraySet();
+ private final HashMap destinations = new HashMap();
private final CopyOnWriteArrayList brokerInfos = new
CopyOnWriteArrayList();
private final LongSequenceGenerator sequenceGenerator = new
LongSequenceGenerator();
@@ -246,10 +246,14 @@
}
public Destination addDestination(ConnectionContext context,
ActiveMQDestination destination) throws Exception {
- if( destinations.contains(destination) ){
- throw new DestinationAlreadyExistsException(destination);
- }
- Destination answer = null;
+
+ Destination answer;
+ synchronized(destinations) {
+ answer = (Destination) destinations.get(destination);
+ if( answer!=null )
+ return answer;
+ }
+
switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
answer = queueRegion.addDestination(context, destination);
@@ -267,31 +271,33 @@
throw createUnknownDestinationTypeException(destination);
}
- destinations.add(destination);
- return answer;
- }
-
- public void removeDestination(ConnectionContext
context,ActiveMQDestination destination,long timeout)
- throws Exception{
- if(destinations.contains(destination)){
- switch(destination.getDestinationType()){
- case ActiveMQDestination.QUEUE_TYPE:
- queueRegion.removeDestination(context,destination,timeout);
- break;
- case ActiveMQDestination.TOPIC_TYPE:
- topicRegion.removeDestination(context,destination,timeout);
- break;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- tempQueueRegion.removeDestination(context,destination,timeout);
- break;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- tempTopicRegion.removeDestination(context,destination,timeout);
- break;
- default:
- throw createUnknownDestinationTypeException(destination);
- }
- destinations.remove(destination);
- }
+ synchronized(destinations) {
+ destinations.put(destination, answer);
+ return answer;
+ }
+ }
+
+ public void removeDestination(ConnectionContext
context,ActiveMQDestination destination,long timeout) throws Exception{
+ synchronized(destinations) {
+ if( destinations.remove(destination)!=null ){
+ switch(destination.getDestinationType()){
+ case ActiveMQDestination.QUEUE_TYPE:
+
queueRegion.removeDestination(context,destination,timeout);
+ break;
+ case ActiveMQDestination.TOPIC_TYPE:
+
topicRegion.removeDestination(context,destination,timeout);
+ break;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+
tempQueueRegion.removeDestination(context,destination,timeout);
+ break;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+
tempTopicRegion.removeDestination(context,destination,timeout);
+ break;
+ default:
+ throw
createUnknownDestinationTypeException(destination);
+ }
+ }
+ }
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo
info) throws Exception{
@@ -305,7 +311,10 @@
}
public ActiveMQDestination[] getDestinations() throws Exception {
- ArrayList l = new ArrayList(destinations);
+ ArrayList l;
+ synchronized(destinations) {
+ l = new ArrayList(destinations.values());
+ }
ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
l.toArray(rc);
return rc;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?view=diff&rev=514754&r1=514753&r2=514754
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
Mon Mar 5 10:04:44 2007
@@ -18,7 +18,11 @@
package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -31,8 +35,26 @@
public TempQueueRegion(RegionBroker broker,DestinationStatistics
destinationStatistics, UsageManager memoryManager, TaskRunnerFactory
taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory,
destinationFactory);
- setAutoCreateDestinations(false);
+ // We should allow the following to be configurable via a Destination
Policy
+ // setAutoCreateDestinations(false);
}
+
+ protected Destination createDestination(ConnectionContext context,
ActiveMQDestination destination) throws Exception {
+ final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)
destination;
+ return new Queue(destination, memoryManager, null,
destinationStatistics, taskRunnerFactory, null) {
+
+ public void addSubscription(ConnectionContext context,Subscription
sub) throws Exception {
+
+ // Only consumers on the same connection can consume from
+ // the temporary destination
+ if( !context.isNetworkConnection() &&
!tempDest.getConnectionId().equals(
sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
+ throw new JMSException("Cannot subscribe to remote
temporary destination: "+tempDest);
+ }
+ super.addSubscription(context, sub);
+ };
+
+ };
+ }
protected Subscription createSubscription(ConnectionContext context,
ConsumerInfo info) throws InvalidSelectorException {
if( info.isBrowser() ) {
@@ -44,6 +66,17 @@
public String toString() {
return "TempQueueRegion: destinations="+destinations.size()+",
subscriptions="+subscriptions.size()+",
memory="+memoryManager.getPercentUsage()+"%";
+ }
+
+ public void removeDestination(ConnectionContext context,
ActiveMQDestination destination, long timeout) throws Exception {
+
+ // Force a timeout value so that we don't get an error that
+ // there is still an active sub. Temp destination may be removed
+ // while a network sub is still active which is valid.
+ if( timeout == 0 )
+ timeout = 1;
+
+ super.removeDestination(context, destination, timeout);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?view=diff&rev=514754&r1=514753&r2=514754
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
Mon Mar 5 10:04:44 2007
@@ -16,6 +16,7 @@
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
@@ -36,7 +37,8 @@
public TempTopicRegion(RegionBroker broker,DestinationStatistics
destinationStatistics,UsageManager memoryManager,
TaskRunnerFactory taskRunnerFactory,DestinationFactory
destinationFactory){
super(broker,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory);
- setAutoCreateDestinations(false);
+ // We should allow the following to be configurable via a Destination
Policy
+ // setAutoCreateDestinations(false);
}
protected Subscription createSubscription(ConnectionContext
context,ConsumerInfo info) throws JMSException{
@@ -67,4 +69,15 @@
return "TempTopicRegion: destinations="+destinations.size()+",
subscriptions="+subscriptions.size()+", memory="
+memoryManager.getPercentUsage()+"%";
}
+
+ public void removeDestination(ConnectionContext context,
ActiveMQDestination destination, long timeout) throws Exception {
+
+ // Force a timeout value so that we don't get an error that
+ // there is still an active sub. Temp destination may be removed
+ // while a network sub is still active which is valid.
+ if( timeout == 0 )
+ timeout = 1;
+
+ super.removeDestination(context, destination, timeout);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=514754&r1=514753&r2=514754
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Mon Mar 5 10:04:44 2007
@@ -358,6 +358,8 @@
lastConnectSucceeded.set(true);
serviceRemoteBrokerInfo(command);
+ // Let the local broker know the remote broker's ID.
+ localBroker.oneway(command);
}else if(command.getClass() == ConnectionError.class ) {
ConnectionError ce = (ConnectionError) command;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?view=diff&rev=514754&r1=514753&r2=514754
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
Mon Mar 5 10:04:44 2007
@@ -535,81 +535,97 @@
assertNoMessagesLeft(connection1);
}
- public void initCombosForTestTempDestinationsRemovedOnConnectionClose() {
- addCombinationValues( "deliveryMode", new Object[]{
- new Integer(DeliveryMode.NON_PERSISTENT),
- new Integer(DeliveryMode.PERSISTENT)} );
- addCombinationValues( "destinationType", new Object[]{
- new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
- new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
- }
- public void testTempDestinationsRemovedOnConnectionClose() throws
Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
+//
+// TODO: need to reimplement this since we don't fail when we send to a
non-existant
+// destination. But if we can access the Region directly then we should be
able to
+// check that if the destination was removed.
+//
+// public void initCombosForTestTempDestinationsRemovedOnConnectionClose()
{
+// addCombinationValues( "deliveryMode", new Object[]{
+// new Integer(DeliveryMode.NON_PERSISTENT),
+// new Integer(DeliveryMode.PERSISTENT)} );
+// addCombinationValues( "destinationType", new Object[]{
+// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
+// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
+// }
+//
+// public void testTempDestinationsRemovedOnConnectionClose() throws
Exception {
+//
+// // Setup a first connection
+// StubConnection connection1 = createConnection();
+// ConnectionInfo connectionInfo1 = createConnectionInfo();
+// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+// connection1.send(connectionInfo1);
+// connection1.send(sessionInfo1);
+// connection1.send(producerInfo1);
+//
+// destination = createDestinationInfo(connection1, connectionInfo1,
destinationType);
+//
+// StubConnection connection2 = createConnection();
+// ConnectionInfo connectionInfo2 = createConnectionInfo();
+// SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+// ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+// connection2.send(connectionInfo2);
+// connection2.send(sessionInfo2);
+// connection2.send(producerInfo2);
+//
+// // Send from connection2 to connection1's temp destination. Should
succeed.
+// connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
+//
+// // Close connection 1
+// connection1.request(closeConnectionInfo(connectionInfo1));
+//
+// try {
+// // Send from connection2 to connection1's temp destination.
Should not succeed.
+// connection2.request(createMessage(producerInfo2, destination,
deliveryMode));
+// fail("Expected JMSException.");
+// } catch ( JMSException success ) {
+// }
+//
+// }
- destination = createDestinationInfo(connection1, connectionInfo1,
destinationType);
-
- StubConnection connection2 = createConnection();
- ConnectionInfo connectionInfo2 = createConnectionInfo();
- SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
- ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
- connection2.send(connectionInfo2);
- connection2.send(sessionInfo2);
- connection2.send(producerInfo2);
+
+// public void initCombosForTestTempDestinationsAreNotAutoCreated() {
+// addCombinationValues( "deliveryMode", new Object[]{
+// new Integer(DeliveryMode.NON_PERSISTENT),
+// new Integer(DeliveryMode.PERSISTENT)} );
+// addCombinationValues( "destinationType", new Object[]{
+// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
+// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
+// }
+//
+//
+
+
+// We create temp destination on demand now so this test case is no longer
+// valid.
+//
+// public void testTempDestinationsAreNotAutoCreated() throws Exception {
+//
+// // Setup a first connection
+// StubConnection connection1 = createConnection();
+// ConnectionInfo connectionInfo1 = createConnectionInfo();
+// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+// connection1.send(connectionInfo1);
+// connection1.send(sessionInfo1);
+// connection1.send(producerInfo1);
+//
+// destination =
ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1",
destinationType);
+//
+// // Should not be able to send to a non-existant temp destination.
+// try {
+// connection1.request(createMessage(producerInfo1, destination,
deliveryMode));
+// fail("Expected JMSException.");
+// } catch ( JMSException success ) {
+// }
+//
+// }
- // Send from connection2 to connection1's temp destination. Should
succeed.
- connection2.send(createMessage(producerInfo2, destination,
deliveryMode));
-
- // Close connection 1
- connection1.request(closeConnectionInfo(connectionInfo1));
-
- try {
- // Send from connection2 to connection1's temp destination.
Should not succeed.
- connection2.request(createMessage(producerInfo2, destination,
deliveryMode));
- fail("Expected JMSException.");
- } catch ( JMSException success ) {
- }
-
- }
- public void initCombosForTestTempDestinationsAreNotAutoCreated() {
- addCombinationValues( "deliveryMode", new Object[]{
- new Integer(DeliveryMode.NON_PERSISTENT),
- new Integer(DeliveryMode.PERSISTENT)} );
- addCombinationValues( "destinationType", new Object[]{
- new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
- new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
- }
-
- public void testTempDestinationsAreNotAutoCreated() throws Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
- destination =
ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1",
destinationType);
-
- // Should not be able to send to a non-existant temp destination.
- try {
- connection1.request(createMessage(producerInfo1, destination,
deliveryMode));
- fail("Expected JMSException.");
- } catch ( JMSException success ) {
- }
-
- }
public void initCombosForTestTempDestinationsOnlyAllowsLocalConsumers() {
addCombinationValues( "deliveryMode", new Object[]{