Updated Branches: refs/heads/trunk e3fed4b57 -> 11781d3cf
Added Fix and tests for https://issues.apache.org/jira/browse/AMQ-4906 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/11781d3c Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/11781d3c Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/11781d3c Branch: refs/heads/trunk Commit: 11781d3cf2a351ea5f3ef80f29f351ac42d3538e Parents: e3fed4b Author: rajdavies <[email protected]> Authored: Thu Nov 28 12:56:11 2013 +0000 Committer: rajdavies <[email protected]> Committed: Thu Nov 28 12:56:11 2013 +0000 ---------------------------------------------------------------------- .../activemq/advisory/AdvisoryBroker.java | 142 ++++++++----------- .../activemq/advisory/ConsumerListenerTest.java | 14 ++ .../activemq/advisory/ProducerListenerTest.java | 35 +++-- 3 files changed, 102 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/11781d3c/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index d68c5bd..5c90287 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -16,14 +16,12 @@ */ package org.apache.activemq.advisory; -import java.util.Comparator; import java.util.Iterator; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentSkipListMap; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; @@ -36,21 +34,7 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicSubscription; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.BrokerInfo; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.DestinationInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.*; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -72,7 +56,7 @@ public class AdvisoryBroker extends BrokerFilter { protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); protected final Queue<ConsumerInfo> consumers = new ConcurrentLinkedQueue<ConsumerInfo>(); - + protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>(); @@ -111,7 +95,7 @@ public class AdvisoryBroker extends BrokerFilter { // for this newly added consumer. if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) { // Replay the connections. - for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) { + for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext(); ) { ConnectionInfo value = iter.next(); ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); fireAdvisory(context, topic, value, info.getConsumerId()); @@ -140,25 +124,25 @@ public class AdvisoryBroker extends BrokerFilter { // Replay the producers. if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) { - for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) { + for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext(); ) { ProducerInfo value = iter.next(); ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); - fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId()); + fireProducerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); } } // Replay the consumers. if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { - for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext();) { + for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext(); ) { ConsumerInfo value = iter.next(); ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); - fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId()); + fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); } } // Replay network bridges if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) { - for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext();) { + for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) { BrokerInfo key = iter.next(); ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); fireAdvisory(context, topic, key, null, networkBridges.get(key)); @@ -181,12 +165,12 @@ public class AdvisoryBroker extends BrokerFilter { } @Override - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { - Destination answer = super.addDestination(context, destination,create); + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { + Destination answer = super.addDestination(context, destination, create); if (!AdvisorySupport.isAdvisoryTopic(destination)) { DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); DestinationInfo previous = destinations.putIfAbsent(destination, info); - if( previous==null ) { + if (previous == null) { ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); fireAdvisory(context, topic, info); } @@ -201,7 +185,7 @@ public class AdvisoryBroker extends BrokerFilter { if (!AdvisorySupport.isAdvisoryTopic(destination)) { DestinationInfo previous = destinations.putIfAbsent(destination, info); - if( previous==null ) { + if (previous == null) { ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); fireAdvisory(context, topic, info); } @@ -220,7 +204,7 @@ public class AdvisoryBroker extends BrokerFilter { ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); fireAdvisory(context, topic, info); ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination); - for(ActiveMQTopic advisoryDestination : advisoryDestinations) { + for (ActiveMQTopic advisoryDestination : advisoryDestinations) { try { next.removeDestination(context, advisoryDestination, -1); } catch (Exception expectedIfDestinationDidNotExistYet) { @@ -241,7 +225,7 @@ public class AdvisoryBroker extends BrokerFilter { ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination()); fireAdvisory(context, topic, info); ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination()); - for(ActiveMQTopic advisoryDestination : advisoryDestinations) { + for (ActiveMQTopic advisoryDestination : advisoryDestinations) { try { next.removeDestination(context, advisoryDestination, -1); } catch (Exception expectedIfDestinationDidNotExistYet) { @@ -269,7 +253,7 @@ public class AdvisoryBroker extends BrokerFilter { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); consumers.remove(info); if (!dest.isTemporary() || destinations.containsKey(dest)) { - fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand()); + fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); } } } @@ -277,7 +261,7 @@ public class AdvisoryBroker extends BrokerFilter { @Override public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); - DurableTopicSubscription sub = ((TopicRegion)((RegionBroker)next).getTopicRegion()).getDurableSubscription(key); + DurableTopicSubscription sub = ((TopicRegion) ((RegionBroker) next).getTopicRegion()).getDurableSubscription(key); super.removeSubscription(context, info); @@ -305,8 +289,8 @@ public class AdvisoryBroker extends BrokerFilter { if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); producers.remove(info.getProducerId()); - if (!dest.isTemporary() || destinations.contains(dest)) { - fireProducerAdvisory(context, dest,topic, info.createRemoveCommand()); + if (!dest.isTemporary() || destinations.containsKey(dest)) { + fireProducerAdvisory(context, dest, topic, info.createRemoveCommand()); } } } @@ -315,7 +299,7 @@ public class AdvisoryBroker extends BrokerFilter { public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) { super.messageExpired(context, messageReference, subscription); try { - if(!messageReference.isAdvisory()) { + if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); payload.clearBody(); @@ -332,11 +316,11 @@ public class AdvisoryBroker extends BrokerFilter { public void messageConsumed(ConnectionContext context, MessageReference messageReference) { super.messageConsumed(context, messageReference); try { - if(!messageReference.isAdvisory()) { + if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); payload.clearBody(); - fireAdvisory(context, topic,payload); + fireAdvisory(context, topic, payload); } } catch (Exception e) { handleFireFailure("consumed", e); @@ -351,7 +335,7 @@ public class AdvisoryBroker extends BrokerFilter { ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); payload.clearBody(); - fireAdvisory(context, topic,payload); + fireAdvisory(context, topic, payload); } } catch (Exception e) { handleFireFailure("delivered", e); @@ -368,7 +352,7 @@ public class AdvisoryBroker extends BrokerFilter { payload.clearBody(); ActiveMQMessage advisoryMessage = new ActiveMQMessage(); if (sub instanceof TopicSubscription) { - advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded()); + advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded()); } advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString()); fireAdvisory(context, topic, payload, null, advisoryMessage); @@ -379,8 +363,8 @@ public class AdvisoryBroker extends BrokerFilter { } @Override - public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { - super.slowConsumer(context, destination,subs); + public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { + super.slowConsumer(context, destination, subs); try { if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination()); @@ -394,7 +378,7 @@ public class AdvisoryBroker extends BrokerFilter { } @Override - public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { + public void fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination) { super.fastProducer(context, producerInfo, destination); try { if (!AdvisorySupport.isAdvisoryTopic(destination)) { @@ -434,7 +418,7 @@ public class AdvisoryBroker extends BrokerFilter { ConnectionContext context = new ConnectionContext(); context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); context.setBroker(getBrokerService().getBroker()); - fireAdvisory(context, topic,null,null,advisoryMessage); + fireAdvisory(context, topic, null, null, advisoryMessage); } catch (Exception e) { handleFireFailure("now master broker", e); } @@ -446,11 +430,11 @@ public class AdvisoryBroker extends BrokerFilter { boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); if (wasDLQd) { try { - if(!messageReference.isAdvisory()) { + if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); payload.clearBody(); - fireAdvisory(context, topic,payload); + fireAdvisory(context, topic, payload); } } catch (Exception e) { handleFireFailure("add to DLQ", e); @@ -463,20 +447,20 @@ public class AdvisoryBroker extends BrokerFilter { @Override public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { try { - if (brokerInfo != null) { - ActiveMQMessage advisoryMessage = new ActiveMQMessage(); - advisoryMessage.setBooleanProperty("started", true); - advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex); - advisoryMessage.setStringProperty("remoteIp", remoteIp); - networkBridges.putIfAbsent(brokerInfo, advisoryMessage); - - ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); - - ConnectionContext context = new ConnectionContext(); - context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); - context.setBroker(getBrokerService().getBroker()); - fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); - } + if (brokerInfo != null) { + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + advisoryMessage.setBooleanProperty("started", true); + advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex); + advisoryMessage.setStringProperty("remoteIp", remoteIp); + networkBridges.putIfAbsent(brokerInfo, advisoryMessage); + + ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); + + ConnectionContext context = new ConnectionContext(); + context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); + context.setBroker(getBrokerService().getBroker()); + fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); + } } catch (Exception e) { handleFireFailure("network bridge started", e); } @@ -485,18 +469,18 @@ public class AdvisoryBroker extends BrokerFilter { @Override public void networkBridgeStopped(BrokerInfo brokerInfo) { try { - if (brokerInfo != null) { - ActiveMQMessage advisoryMessage = new ActiveMQMessage(); - advisoryMessage.setBooleanProperty("started", false); - networkBridges.remove(brokerInfo); - - ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); - - ConnectionContext context = new ConnectionContext(); - context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); - context.setBroker(getBrokerService().getBroker()); - fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); - } + if (brokerInfo != null) { + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + advisoryMessage.setBooleanProperty("started", false); + networkBridges.remove(brokerInfo); + + ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); + + ConnectionContext context = new ConnectionContext(); + context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); + context.setBroker(getBrokerService().getBroker()); + fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); + } } catch (Exception e) { handleFireFailure("network bridge stopped", e); } @@ -516,16 +500,16 @@ public class AdvisoryBroker extends BrokerFilter { fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } - protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception { - fireConsumerAdvisory(context, consumerDestination,topic, command, null); + protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command) throws Exception { + fireConsumerAdvisory(context, consumerDestination, topic, command, null); } - protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { + protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); int count = 0; - Set<Destination>set = getDestinations(consumerDestination); + Set<Destination> set = getDestinations(consumerDestination); if (set != null) { - for (Destination dest:set) { + for (Destination dest : set) { count += dest.getDestinationStatistics().getConsumers().getCount(); } } @@ -534,11 +518,11 @@ public class AdvisoryBroker extends BrokerFilter { fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } - protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { - fireProducerAdvisory(context,producerDestination, topic, command, null); + protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { + fireProducerAdvisory(context, producerDestination, topic, command, null); } - protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { + protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); int count = 0; if (producerDestination != null) { http://git-wip-us.apache.org/repos/asf/activemq/blob/11781d3c/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java index 72da777..2c5f9cd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -80,6 +81,19 @@ public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements C assertConsumerEvent(0, false); } + public void testConsumerEventsOnTemporaryDestination() throws Exception { + + Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); + Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue(); + consumerEventSource = new ConsumerEventSource(connection, dest); + consumerEventSource.setConsumerListener(this); + consumerEventSource.start(); + MessageConsumer consumer = s.createConsumer(dest); + assertConsumerEvent(1,true); + consumer.close(); + assertConsumerEvent(0,false); + } + public void onConsumerEvent(ConsumerEvent event) { eventQueue.add(event); } http://git-wip-us.apache.org/repos/asf/activemq/blob/11781d3c/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java index bc90cab..dfa1b5e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java @@ -21,10 +21,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; - import org.apache.activemq.EmbeddedBrokerTestSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +47,18 @@ public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements P producerEventSource.start(); consumerSession1 = createProducer(); - assertConsumerEvent(1, true); + assertProducerEvent(1, true); consumerSession2 = createProducer(); - assertConsumerEvent(2, true); + assertProducerEvent(2, true); consumerSession1.close(); consumerSession1 = null; - assertConsumerEvent(1, false); + assertProducerEvent(1, false); consumerSession2.close(); consumerSession2 = null; - assertConsumerEvent(0, false); + assertProducerEvent(0, false); } public void testListenWhileAlreadyConsumersActive() throws Exception { @@ -66,18 +66,33 @@ public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements P consumerSession2 = createProducer(); producerEventSource.start(); - assertConsumerEvent(2, true); - assertConsumerEvent(2, true); + assertProducerEvent(2, true); + assertProducerEvent(2, true); consumerSession1.close(); consumerSession1 = null; - assertConsumerEvent(1, false); + assertProducerEvent(1, false); consumerSession2.close(); consumerSession2 = null; - assertConsumerEvent(0, false); + assertProducerEvent(0, false); + } + + public void testConsumerEventsOnTemporaryDestination() throws Exception { + + Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); + Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue(); + producerEventSource = new ProducerEventSource(connection, dest); + producerEventSource.setProducerListener(this); + producerEventSource.start(); + MessageProducer producer = s.createProducer(dest); + assertProducerEvent(1, true); + producer.close(); + assertProducerEvent(0, false); } + + @Override public void onProducerEvent(ProducerEvent event) { eventQueue.add(event); @@ -110,7 +125,7 @@ public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements P super.tearDown(); } - protected void assertConsumerEvent(int count, boolean started) throws InterruptedException { + protected void assertProducerEvent(int count, boolean started) throws InterruptedException { ProducerEvent event = waitForProducerEvent(); assertEquals("Producer count", count, event.getProducerCount()); assertEquals("started", started, event.isStarted());
