Author: dejanb
Date: Fri Jan 23 04:11:00 2009
New Revision: 737017
URL: http://svn.apache.org/viewvc?rev=737017&view=rev
Log:
fix for http://issues.apache.org/activemq/browse/AMQ-2016
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=737017&r1=737016&r2=737017&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Jan 23 04:11:00 2009
@@ -27,12 +27,16 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
+
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -96,6 +100,10 @@
private boolean strictOrderDispatch=false;
private QueueDispatchSelector dispatchSelector;
private boolean optimizedDispatch=false;
+ private boolean firstConsumer = false;
+ private int timeBeforeDispatchStarts = 0;
+ private int consumersBeforeDispatchStarts = 0;
+ private CountDownLatch consumersBeforeStartsLatch;
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
wakeup();
@@ -134,7 +142,7 @@
}
// If a VMPendingMessageCursor don't use the default Producer System
Usage
// since it turns into a shared blocking queue which can lead to a
network deadlock.
- // If we are ccursoring to disk..it's not and issue because it does
not block due
+ // If we are cursoring to disk..it's not and issue because it does not
block due
// to large disk sizes.
if( messages instanceof VMPendingMessageCursor ) {
this.systemUsage = brokerService.getSystemUsage();
@@ -221,6 +229,18 @@
// needs to be synchronized - so no contention with dispatching
synchronized (consumers) {
+
+ // set a flag if this is a first consumer
+ if (consumers.size() == 0) {
+ firstConsumer = true;
+ } else {
+ firstConsumer = false;
+ }
+
+ if (consumersBeforeStartsLatch != null) {
+ consumersBeforeStartsLatch.countDown();
+ }
+
addToConsumerList(sub);
if (sub.getConsumerInfo().isExclusive()) {
Subscription exclusiveConsumer =
dispatchSelector.getExclusiveConsumer();
@@ -610,6 +630,22 @@
public void setOptimizedDispatch(boolean optimizedDispatch) {
this.optimizedDispatch = optimizedDispatch;
}
+ public int getTimeBeforeDispatchStarts() {
+ return timeBeforeDispatchStarts;
+ }
+
+ public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
+ this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
+ }
+
+ public int getConsumersBeforeDispatchStarts() {
+ return consumersBeforeDispatchStarts;
+ }
+
+ public void setConsumersBeforeDispatchStarts(int
consumersBeforeDispatchStarts) {
+ this.consumersBeforeDispatchStarts =
consumersBeforeDispatchStarts;
+ consumersBeforeStartsLatch = new
CountDownLatch(consumersBeforeDispatchStarts);
+ }
// Implementation methods
//
-------------------------------------------------------------------------
@@ -990,6 +1026,35 @@
}
}
+ if (firstConsumer) {
+ firstConsumer = false;
+ try {
+ if (consumersBeforeDispatchStarts > 0) {
+ int timeout = 1000; // wait one second
by default if consumer count isn't reached
+ if (timeBeforeDispatchStarts > 0) {
+ timeout =
timeBeforeDispatchStarts;
+ }
+ if
(consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+ if (LOG.isDebugEnabled()) {
+
LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(timeout + "
ms elapsed and " + consumers.size() + " consumers subscribed. Starting
dispatch.");
+ }
+ }
+ }
+ if (timeBeforeDispatchStarts > 0 &&
consumersBeforeDispatchStarts <= 0) {
+
iteratingMutex.wait(timeBeforeDispatchStarts);
+ if (LOG.isDebugEnabled()) {
+
LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+
boolean pageInMoreMessages = false;
synchronized (messages) {
pageInMoreMessages = !messages.isEmpty();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=737017&r1=737016&r2=737017&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Jan 23 04:11:00 2009
@@ -65,6 +65,8 @@
private boolean useConsumerPriority=true;
private boolean strictOrderDispatch=false;
private boolean lazyDispatch=false;
+ private int timeBeforeDispatchStarts = 0;
+ private int consumersBeforeDispatchStarts = 0;
private boolean advisoryForSlowConsumers;
private boolean advisdoryForFastProducers;
private boolean advisoryForDiscardingMessages;
@@ -93,7 +95,8 @@
queue.setStrictOrderDispatch(isStrictOrderDispatch());
queue.setOptimizedDispatch(isOptimizedDispatch());
queue.setLazyDispatch(isLazyDispatch());
-
+ queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
+
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
}
public void configure(Topic topic) {
@@ -439,6 +442,22 @@
this.lazyDispatch = lazyDispatch;
}
+ public int getTimeBeforeDispatchStarts() {
+ return timeBeforeDispatchStarts;
+ }
+
+ public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
+ this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
+ }
+
+ public int getConsumersBeforeDispatchStarts() {
+ return consumersBeforeDispatchStarts;
+ }
+
+ public void setConsumersBeforeDispatchStarts(int
consumersBeforeDispatchStarts) {
+ this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
+ }
+
/**
* @return the advisoryForSlowConsumers
*/
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java?rev=737017&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
Fri Jan 23 04:11:00 2009
@@ -0,0 +1,229 @@
+package org.apache.activemq.usecases;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JMSConsumerTest;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+
+public class MessageGroupDelayedTest extends JmsTestSupport {
+ public static final Log log =
LogFactory.getLog(MessageGroupDelayedTest.class);
+ protected Connection connection;
+ protected Session session;
+ protected MessageProducer producer;
+ protected Destination destination;
+
+ public int consumersBeforeDispatchStarts;
+ public int timeBeforeDispatchStarts;
+
+ BrokerService broker;
+ protected TransportConnector connector;
+
+ protected HashMap<String, Integer> messageCount = new HashMap<String,
Integer>();
+ protected HashMap<String, Set<String>> messageGroups = new HashMap<String,
Set<String>>();
+
+ public static Test suite() {
+ return suite(MessageGroupDelayedTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ public void setUp() throws Exception {
+ broker = createBroker();
+ broker.start();
+ ActiveMQConnectionFactory connFactory = new
ActiveMQConnectionFactory(connector.getConnectUri());
+ //ActiveMQConnectionFactory connFactory = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ connection = connFactory.createConnection();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = new ActiveMQQueue("test-queue2");
+ producer = session.createProducer(destination);
+ connection.start();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ service.setPersistent(false);
+ service.setUseJmx(false);
+
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
+ policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
+ policyMap.setDefaultEntry(policy);
+ service.setDestinationPolicy(policyMap);
+
+ connector = service.addConnector("tcp://localhost:0");
+ return service;
+ }
+
+ public void tearDown() throws Exception {
+ producer.close();
+ session.close();
+ connection.close();
+ }
+
+
+
+ public void initCombosForTestDelayedDirectConnectionListener() {
+ addCombinationValues("consumersBeforeDispatchStarts", new Object[]
{0, 3, 5});
+ addCombinationValues("timeBeforeDispatchStarts", new Object[] {0,
100});
+ }
+
+ public void testDelayedDirectConnectionListener() throws Exception {
+
+ for(int i = 0; i < 10; i++) {
+ Message msga = session.createTextMessage("hello a");
+ msga.setStringProperty("JMSXGroupID", "A");
+ producer.send(msga);
+ Message msgb = session.createTextMessage("hello b");
+ msgb.setStringProperty("JMSXGroupID", "B");
+ producer.send(msgb);
+ Message msgc = session.createTextMessage("hello c");
+ msgc.setStringProperty("JMSXGroupID", "C");
+ producer.send(msgc);
+ }
+ log.info("30 messages sent to group A/B/C");
+
+ int[] counters = {10, 10, 10};
+
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch doneSignal = new CountDownLatch(1);
+
+ messageCount.put("worker1", 0);
+ messageGroups.put("worker1", new HashSet<String>());
+ Worker worker1 = new Worker(connection, destination, "worker1",
startSignal, doneSignal, counters, messageCount, messageGroups);
+ messageCount.put("worker2", 0);
+ messageGroups.put("worker2", new HashSet<String>());
+ Worker worker2 = new Worker(connection, destination, "worker2",
startSignal, doneSignal, counters, messageCount, messageGroups);
+ messageCount.put("worker3", 0);
+ messageGroups.put("worker3", new HashSet<String>());
+ Worker worker3 = new Worker(connection, destination, "worker3",
startSignal, doneSignal, counters, messageCount, messageGroups);
+
+
+ new Thread(worker1).start();
+ new Thread(worker2).start();
+ new Thread(worker3).start();
+
+ startSignal.countDown();
+ doneSignal.await();
+
+ // check results
+ if (consumersBeforeDispatchStarts == 0 && timeBeforeDispatchStarts == 0) {
+ log.info("Ignoring results because both parameters are 0");
+ return;
+ }
+
+ for (String worker: messageCount.keySet()) {
+ log.info("worker " + worker + " received " + messageCount.get(worker) +
" messages from groups " + messageGroups.get(worker));
+ assertEquals(10, messageCount.get(worker).intValue());
+ assertEquals(1, messageGroups.get(worker).size());
+ }
+
+ }
+
+ private static final class Worker implements Runnable {
+ private Connection connection = null;
+ private Destination queueName = null;
+ private String workerName = null;
+ private CountDownLatch startSignal = null;
+ private CountDownLatch doneSignal = null;
+ private int[] counters = null;
+ private HashMap<String, Integer> messageCount;
+ private HashMap<String, Set<String>>messageGroups;
+
+
+ private Worker(Connection connection, Destination queueName, String
workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[]
counters, HashMap<String, Integer> messageCount, HashMap<String,
Set<String>>messageGroups) {
+ this.connection = connection;
+ this.queueName = queueName;
+ this.workerName = workerName;
+ this.startSignal = startSignal;
+ this.doneSignal = doneSignal;
+ this.counters = counters;
+ this.messageCount = messageCount;
+ this.messageGroups = messageGroups;
+ }
+
+ private void update(String group) {
+ int msgCount = messageCount.get(workerName);
+ messageCount.put(workerName, msgCount + 1);
+ Set<String> groups = messageGroups.get(workerName);
+ groups.add(group);
+ messageGroups.put(workerName, groups);
+ }
+
+ public void run() {
+
+ try {
+ log.info(workerName);
+ startSignal.await();
+ Session sess = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = sess.createConsumer(queueName);
+
+ while(true) {
+ if(counters[0] == 0 && counters[1] == 0 && counters[2] == 0 ) {
+ doneSignal.countDown();
+ log.info(workerName + " done...");
+ break;
+ }
+
+ Message msg = consumer.receive(500);
+ if(msg == null)
+ continue;
+
+ String group = msg.getStringProperty("JMSXGroupID");
+ boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
+
+ if("A".equals(group)){
+ --counters[0];
+ update(group);
+ Thread.sleep(500);
+ }
+ else if("B".equals(group)) {
+ --counters[1];
+ update(group);
+ Thread.sleep(100);
+ }
+ else if("C".equals(group)) {
+ --counters[2];
+ update(group);
+ Thread.sleep(10);
+ }
+ else {
+ log.warn("unknown group");
+ }
+ if (counters[0] != 0 || counters[1] != 0 || counters[2] != 0 ) {
+ msg.acknowledge();
+ }
+ }
+ consumer.close();
+ sess.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}