[
https://issues.apache.org/jira/browse/AMQ-3607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
James Furness updated AMQ-3607:
-------------------------------
Description:
The below test case tests slow consumer handling with a variety of topic
policies and SessionFactory/ConnectionFactory settings. The expectation is that
a normal (i.e. fast) consumer will continue to receive messages whilst a slow
consumer is blocking.
Without a prefetch limit, the expected behaviour is seen with
setOptimizeAcknowledge both true and false.
If a prefetch limit is set, setOptimizeAcknowledge(true) causes the normal/fast
consumer to miss messages whilst the slow consumer is blocking.
Would be nice to be able to turn on OptimiseAcknowledge for performance
reasons, however it is also necessary to set the prefetch limit in order to
trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic.
{code:title=testDefaultSettings}
Publisher: Send 0
SlowConsumer: Receive 0
FastConsumer: Receive 0
testDefaultSettings: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
testDefaultSettings: Whilst slow consumer blocked:
- SlowConsumer Received: 1 [0]
- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
testDefaultSettings: After slow consumer unblocked:
- SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
{code}
{code:title=testDefaultSettingsWithOptimiseAcknowledge}
testDefaultSettingsWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4,
5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25,
26, 27, 28, 29]
testDefaultSettingsWithOptimiseAcknowledge: Whilst slow consumer blocked:
- SlowConsumer Received: 1 [0]
- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
testDefaultSettingsWithOptimiseAcknowledge: After slow consumer unblocked:
- SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
{code}
{code:title=testBounded}
testBounded: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
testBounded: Whilst slow consumer blocked:
- SlowConsumer Received: 1 [0]
- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
testBounded: After slow consumer unblocked:
- SlowConsumer Received: 10 [0, 1, 2, 3, 4, 25, 26, 27, 28, 29]
- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
{code}
{code:title=testBoundedWithOptimiseAcknowledge}
testBoundedWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7,
8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27,
28, 29]
testBoundedWithOptimiseAcknowledge: Whilst slow consumer blocked:
- SlowConsumer Received: 1 [0]
- FastConsumer Received: 5 [0, 1, 2, 3, 4]
testBoundedWithOptimiseAcknowledge: After slow consumer unblocked:
- SlowConsumer Received: 5 [0, 1, 2, 3, 4]
- FastConsumer Received: 5 [0, 1, 2, 3, 4]
java.lang.AssertionError: Fast consumer missed messages whilst slow consumer
was blocking expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]> but was:<[0, 1, 2, 3,
4]>
{code}
{code:title=ActiveMQSlowConsumerManualTest.java}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author James Furness
*/
public class ActiveMQSlowConsumerManualTest {
private static final int PORT = 12345;
private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
private static final String URL = "nio://localhost:" + PORT +
"?socket.tcpNoDelay=true";
@Test(timeout = 60000)
public void testDefaultSettings() throws Exception {
runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
}
@Test(timeout = 60000)
public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1,
false, false, true, false);
}
@Test(timeout = 60000)
public void testBounded() throws Exception {
runTest("testBounded", 30, 5, 5, false, false, false, false);
}
@Test(timeout = 60000)
public void testBoundedWithOptimiseAcknowledge() throws Exception {
runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 5, false, false,
true, false);
}
public void runTest(String name, int sendMessageCount, int prefetchLimit,
int messageLimit, boolean evictOldestMessage, boolean disableFlowControl,
boolean optimizeAcknowledge, boolean persistent) throws Exception {
BrokerService broker = createBroker(persistent);
broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit,
messageLimit, evictOldestMessage, disableFlowControl));
broker.start();
// Slow consumer
Session slowConsumerSession = buildSession("SlowConsumer", URL,
optimizeAcknowledge);
final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ?
new ArrayList<Integer>() : null;
MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
new MessageListener() {
@Override
public void onMessage(Message message) {
try {
slowConsumerReceiveCount.incrementAndGet();
int count = Integer.parseInt(((TextMessage)
message).getText());
if (slowConsumerReceived != null)
slowConsumerReceived.add(count);
if (count % 10000 == 0)
System.out.println("SlowConsumer: Receive " + count);
blockSlowConsumer.await();
} catch (Exception ignored) {}
}
}
);
// Fast consumer
Session fastConsumerSession = buildSession("FastConsumer", URL,
optimizeAcknowledge);
final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ?
new ArrayList<Integer>() : null;
MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
new MessageListener() {
@Override
public void onMessage(Message message) {
try {
fastConsumerReceiveCount.incrementAndGet();
int count = Integer.parseInt(((TextMessage)
message).getText());
if (fastConsumerReceived != null)
fastConsumerReceived.add(count);
if (count % 10000 == 0)
System.out.println("FastConsumer: Receive " + count);
} catch (Exception ignored) {}
}
}
);
// Wait for consumers to connect
Thread.sleep(500);
// Publisher
AtomicInteger sentCount = new AtomicInteger();
List<Integer> sent = sendMessageCount <= 1000 ? new
ArrayList<Integer>() : null;
Session publisherSession = buildSession("Publisher", URL,
optimizeAcknowledge);
MessageProducer publisher = createPublisher(publisherSession);
for (int i = 0; i < sendMessageCount; i++) {
sentCount.incrementAndGet();
if (sent != null) sent.add(i);
if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
}
// Wait for messages to arrive
Thread.sleep(500);
System.out.println(name + ": Publisher Sent: " + sentCount + " " +
sent);
System.out.println(name + ": Whilst slow consumer blocked:");
System.out.println("\t\t- SlowConsumer Received: " +
slowConsumerReceiveCount + " " + slowConsumerReceived);
System.out.println("\t\t- FastConsumer Received: " +
fastConsumerReceiveCount + " " + fastConsumerReceived);
// Unblock slow consumer
blockSlowConsumer.countDown();
// Wait for messages to arrive
Thread.sleep(500);
System.out.println(name + ": After slow consumer unblocked:");
System.out.println("\t\t- SlowConsumer Received: " +
slowConsumerReceiveCount + " " + slowConsumerReceived);
System.out.println("\t\t- FastConsumer Received: " +
fastConsumerReceiveCount + " " + fastConsumerReceived);
System.out.println();
publisher.close();
publisherSession.close();
slowConsumer.close();
slowConsumerSession.close();
fastConsumer.close();
fastConsumerSession.close();
broker.stop();
Assert.assertEquals("Fast consumer missed messages whilst slow consumer
was blocking", sent, fastConsumerReceived);
Assert.assertEquals("Slow consumer received incorrect message count",
Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit :
Integer.MAX_VALUE)), slowConsumerReceived.size());
}
private static BrokerService createBroker(boolean persistent) throws
Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("TestBroker");
broker.setPersistent(persistent);
broker.addConnector(URL);
return broker;
}
private static MessageConsumer createSubscriber(Session session,
MessageListener messageListener) throws JMSException {
MessageConsumer consumer = session.createConsumer(TOPIC);
consumer.setMessageListener(messageListener);
return consumer;
}
private static MessageProducer createPublisher(Session session) throws
JMSException {
MessageProducer producer = session.createProducer(TOPIC);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return producer;
}
private static Session buildSession(String clientId, String url, boolean
optimizeAcknowledge) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setDisableTimeStampsByDefault(true);
connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
Connection connection = connectionFactory.createConnection();
connection.setClientID(clientId);
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
connection.start();
return session;
}
private static PolicyMap buildPolicy(ActiveMQTopic topic, int
prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean
disableFlowControl) {
PolicyMap policyMap = new PolicyMap();
PolicyEntry policyEntry = new PolicyEntry();
if (evictOldestMessage) {
policyEntry.setMessageEvictionStrategy(new
OldestMessageEvictionStrategy());
}
if (disableFlowControl) {
policyEntry.setProducerFlowControl(false);
}
if (prefetchLimit > 0) {
policyEntry.setTopicPrefetch(prefetchLimit);
}
if (messageLimit > 0) {
ConstantPendingMessageLimitStrategy messageLimitStrategy = new
ConstantPendingMessageLimitStrategy();
messageLimitStrategy.setLimit(messageLimit);
policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
}
policyMap.put(topic, policyEntry);
return policyMap;
}
}
{code}
was:
The below test case tests slow consumer handling with a variety of topic
policies and SessionFactory/ConnectionFactory settings. The expectation is that
a normal (i.e. fast) consumer will continue to receive messages whilst a slow
consumer is blocking.
Without a prefetch limit, the expected behaviour is seen with
setOptimizeAcknowledge both true and false.
If a prefetch limit is set, setOptimizeAcknowledge(true) causes the normal/fast
consumer to miss messages whilst the slow consumer is blocking.
Would be nice to be able to turn on OptimiseAcknowledge for performance
reasons, however it is also necessary to set the prefetch limit in order to
trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic.
{code}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author James Furness
*/
public class ActiveMQSlowConsumerManualTest {
private static final int PORT = 12345;
private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
private static final String URL = "nio://localhost:" + PORT +
"?socket.tcpNoDelay=true";
@Test(timeout = 60000)
public void testDefaultSettings() throws Exception {
runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
}
@Test(timeout = 60000)
public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1,
false, false, true, false);
}
@Test(timeout = 60000)
public void testBounded() throws Exception {
runTest("testBounded", 30, 5, 5, false, false, false, false);
}
@Test(timeout = 60000)
public void testBoundedWithOptimiseAcknowledge() throws Exception {
runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 5, false, false,
true, false);
}
public void runTest(String name, int sendMessageCount, int prefetchLimit,
int messageLimit, boolean evictOldestMessage, boolean disableFlowControl,
boolean optimizeAcknowledge, boolean persistent) throws Exception {
BrokerService broker = createBroker(persistent);
broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit,
messageLimit, evictOldestMessage, disableFlowControl));
broker.start();
// Slow consumer
Session slowConsumerSession = buildSession("SlowConsumer", URL,
optimizeAcknowledge);
final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ?
new ArrayList<Integer>() : null;
MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
new MessageListener() {
@Override
public void onMessage(Message message) {
try {
slowConsumerReceiveCount.incrementAndGet();
int count = Integer.parseInt(((TextMessage)
message).getText());
if (slowConsumerReceived != null)
slowConsumerReceived.add(count);
if (count % 10000 == 0)
System.out.println("SlowConsumer: Receive " + count);
blockSlowConsumer.await();
} catch (Exception ignored) {}
}
}
);
// Fast consumer
Session fastConsumerSession = buildSession("FastConsumer", URL,
optimizeAcknowledge);
final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ?
new ArrayList<Integer>() : null;
MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
new MessageListener() {
@Override
public void onMessage(Message message) {
try {
fastConsumerReceiveCount.incrementAndGet();
int count = Integer.parseInt(((TextMessage)
message).getText());
if (fastConsumerReceived != null)
fastConsumerReceived.add(count);
if (count % 10000 == 0)
System.out.println("FastConsumer: Receive " + count);
} catch (Exception ignored) {}
}
}
);
// Wait for consumers to connect
Thread.sleep(500);
// Publisher
AtomicInteger sentCount = new AtomicInteger();
List<Integer> sent = sendMessageCount <= 1000 ? new
ArrayList<Integer>() : null;
Session publisherSession = buildSession("Publisher", URL,
optimizeAcknowledge);
MessageProducer publisher = createPublisher(publisherSession);
for (int i = 0; i < sendMessageCount; i++) {
sentCount.incrementAndGet();
if (sent != null) sent.add(i);
if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
}
// Wait for messages to arrive
Thread.sleep(500);
System.out.println(name + ": Publisher Sent: " + sentCount + " " +
sent);
System.out.println(name + ": Whilst slow consumer blocked:");
System.out.println("\t\t- SlowConsumer Received: " +
slowConsumerReceiveCount + " " + slowConsumerReceived);
System.out.println("\t\t- FastConsumer Received: " +
fastConsumerReceiveCount + " " + fastConsumerReceived);
// Unblock slow consumer
blockSlowConsumer.countDown();
// Wait for messages to arrive
Thread.sleep(500);
System.out.println(name + ": After slow consumer unblocked:");
System.out.println("\t\t- SlowConsumer Received: " +
slowConsumerReceiveCount + " " + slowConsumerReceived);
System.out.println("\t\t- FastConsumer Received: " +
fastConsumerReceiveCount + " " + fastConsumerReceived);
System.out.println();
publisher.close();
publisherSession.close();
slowConsumer.close();
slowConsumerSession.close();
fastConsumer.close();
fastConsumerSession.close();
broker.stop();
Assert.assertEquals("Fast consumer missed messages whilst slow consumer
was blocking", sent, fastConsumerReceived);
Assert.assertEquals("Slow consumer received incorrect message count",
Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit :
Integer.MAX_VALUE)), slowConsumerReceived.size());
}
private static BrokerService createBroker(boolean persistent) throws
Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("TestBroker");
broker.setPersistent(persistent);
broker.addConnector(URL);
return broker;
}
private static MessageConsumer createSubscriber(Session session,
MessageListener messageListener) throws JMSException {
MessageConsumer consumer = session.createConsumer(TOPIC);
consumer.setMessageListener(messageListener);
return consumer;
}
private static MessageProducer createPublisher(Session session) throws
JMSException {
MessageProducer producer = session.createProducer(TOPIC);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return producer;
}
private static Session buildSession(String clientId, String url, boolean
optimizeAcknowledge) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setDisableTimeStampsByDefault(true);
connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
Connection connection = connectionFactory.createConnection();
connection.setClientID(clientId);
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
connection.start();
return session;
}
private static PolicyMap buildPolicy(ActiveMQTopic topic, int
prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean
disableFlowControl) {
PolicyMap policyMap = new PolicyMap();
PolicyEntry policyEntry = new PolicyEntry();
if (evictOldestMessage) {
policyEntry.setMessageEvictionStrategy(new
OldestMessageEvictionStrategy());
}
if (disableFlowControl) {
policyEntry.setProducerFlowControl(false);
}
if (prefetchLimit > 0) {
policyEntry.setTopicPrefetch(prefetchLimit);
}
if (messageLimit > 0) {
ConstantPendingMessageLimitStrategy messageLimitStrategy = new
ConstantPendingMessageLimitStrategy();
messageLimitStrategy.setLimit(messageLimit);
policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
}
policyMap.put(topic, policyEntry);
return policyMap;
}
}
{code}
> Setting OptimiseAcknowledge on a queue with a prefetch limit causes
> normal/fast consumers to miss messages when a slow consumer is blocking
> -------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-3607
> URL: https://issues.apache.org/jira/browse/AMQ-3607
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.0
> Environment: Java: 1.6.0_26-b03-383.jdk
> Reporter: James Furness
>
> The below test case tests slow consumer handling with a variety of topic
> policies and SessionFactory/ConnectionFactory settings. The expectation is
> that a normal (i.e. fast) consumer will continue to receive messages whilst a
> slow consumer is blocking.
> Without a prefetch limit, the expected behaviour is seen with
> setOptimizeAcknowledge both true and false.
> If a prefetch limit is set, setOptimizeAcknowledge(true) causes the
> normal/fast consumer to miss messages whilst the slow consumer is blocking.
> Would be nice to be able to turn on OptimiseAcknowledge for performance
> reasons, however it is also necessary to set the prefetch limit in order to
> trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic.
> {code:title=testDefaultSettings}
> Publisher: Send 0
> SlowConsumer: Receive 0
> FastConsumer: Receive 0
> testDefaultSettings: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettings: Whilst slow consumer blocked:
> - SlowConsumer Received: 1 [0]
> - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettings: After slow consumer unblocked:
> - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> {code}
> {code:title=testDefaultSettingsWithOptimiseAcknowledge}
> testDefaultSettingsWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24,
> 25, 26, 27, 28, 29]
> testDefaultSettingsWithOptimiseAcknowledge: Whilst slow consumer blocked:
> - SlowConsumer Received: 1 [0]
> - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettingsWithOptimiseAcknowledge: After slow consumer unblocked:
> - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> {code}
> {code:title=testBounded}
> testBounded: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
> 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testBounded: Whilst slow consumer blocked:
> - SlowConsumer Received: 1 [0]
> - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testBounded: After slow consumer unblocked:
> - SlowConsumer Received: 10 [0, 1, 2, 3, 4, 25, 26, 27, 28, 29]
> - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> {code}
> {code:title=testBoundedWithOptimiseAcknowledge}
> testBoundedWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6,
> 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26,
> 27, 28, 29]
> testBoundedWithOptimiseAcknowledge: Whilst slow consumer blocked:
> - SlowConsumer Received: 1 [0]
> - FastConsumer Received: 5 [0, 1, 2, 3, 4]
> testBoundedWithOptimiseAcknowledge: After slow consumer unblocked:
> - SlowConsumer Received: 5 [0, 1, 2, 3, 4]
> - FastConsumer Received: 5 [0, 1, 2, 3, 4]
> java.lang.AssertionError: Fast consumer missed messages whilst slow consumer
> was blocking expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]> but was:<[0, 1, 2,
> 3, 4]>
> {code}
> {code:title=ActiveMQSlowConsumerManualTest.java}
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import
> org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
> import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
> import org.apache.activemq.broker.region.policy.PolicyEntry;
> import org.apache.activemq.broker.region.policy.PolicyMap;
> import org.apache.activemq.command.ActiveMQTopic;
> import org.junit.Assert;
> import org.junit.Ignore;
> import org.junit.Test;
> import javax.jms.Connection;
> import javax.jms.DeliveryMode;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.atomic.AtomicInteger;
> /**
> * @author James Furness
> */
> public class ActiveMQSlowConsumerManualTest {
> private static final int PORT = 12345;
> private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
> private static final String URL = "nio://localhost:" + PORT +
> "?socket.tcpNoDelay=true";
> @Test(timeout = 60000)
> public void testDefaultSettings() throws Exception {
> runTest("testDefaultSettings", 30, -1, -1, false, false, false,
> false);
> }
> @Test(timeout = 60000)
> public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception
> {
> runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1,
> false, false, true, false);
> }
> @Test(timeout = 60000)
> public void testBounded() throws Exception {
> runTest("testBounded", 30, 5, 5, false, false, false, false);
> }
> @Test(timeout = 60000)
> public void testBoundedWithOptimiseAcknowledge() throws Exception {
> runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 5, false, false,
> true, false);
> }
> public void runTest(String name, int sendMessageCount, int prefetchLimit,
> int messageLimit, boolean evictOldestMessage, boolean disableFlowControl,
> boolean optimizeAcknowledge, boolean persistent) throws Exception {
> BrokerService broker = createBroker(persistent);
> broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit,
> messageLimit, evictOldestMessage, disableFlowControl));
> broker.start();
> // Slow consumer
> Session slowConsumerSession = buildSession("SlowConsumer", URL,
> optimizeAcknowledge);
> final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
> final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
> final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ?
> new ArrayList<Integer>() : null;
> MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
> new MessageListener() {
> @Override
> public void onMessage(Message message) {
> try {
> slowConsumerReceiveCount.incrementAndGet();
> int count = Integer.parseInt(((TextMessage)
> message).getText());
> if (slowConsumerReceived != null)
> slowConsumerReceived.add(count);
> if (count % 10000 == 0)
> System.out.println("SlowConsumer: Receive " + count);
> blockSlowConsumer.await();
> } catch (Exception ignored) {}
> }
> }
> );
> // Fast consumer
> Session fastConsumerSession = buildSession("FastConsumer", URL,
> optimizeAcknowledge);
> final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
> final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ?
> new ArrayList<Integer>() : null;
> MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
> new MessageListener() {
> @Override
> public void onMessage(Message message) {
> try {
> fastConsumerReceiveCount.incrementAndGet();
> int count = Integer.parseInt(((TextMessage)
> message).getText());
> if (fastConsumerReceived != null)
> fastConsumerReceived.add(count);
> if (count % 10000 == 0)
> System.out.println("FastConsumer: Receive " + count);
> } catch (Exception ignored) {}
> }
> }
> );
> // Wait for consumers to connect
> Thread.sleep(500);
> // Publisher
> AtomicInteger sentCount = new AtomicInteger();
> List<Integer> sent = sendMessageCount <= 1000 ? new
> ArrayList<Integer>() : null;
> Session publisherSession = buildSession("Publisher", URL,
> optimizeAcknowledge);
> MessageProducer publisher = createPublisher(publisherSession);
> for (int i = 0; i < sendMessageCount; i++) {
> sentCount.incrementAndGet();
> if (sent != null) sent.add(i);
> if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
>
> publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
> }
> // Wait for messages to arrive
> Thread.sleep(500);
> System.out.println(name + ": Publisher Sent: " + sentCount + " " +
> sent);
> System.out.println(name + ": Whilst slow consumer blocked:");
> System.out.println("\t\t- SlowConsumer Received: " +
> slowConsumerReceiveCount + " " + slowConsumerReceived);
> System.out.println("\t\t- FastConsumer Received: " +
> fastConsumerReceiveCount + " " + fastConsumerReceived);
> // Unblock slow consumer
> blockSlowConsumer.countDown();
> // Wait for messages to arrive
> Thread.sleep(500);
> System.out.println(name + ": After slow consumer unblocked:");
> System.out.println("\t\t- SlowConsumer Received: " +
> slowConsumerReceiveCount + " " + slowConsumerReceived);
> System.out.println("\t\t- FastConsumer Received: " +
> fastConsumerReceiveCount + " " + fastConsumerReceived);
> System.out.println();
> publisher.close();
> publisherSession.close();
> slowConsumer.close();
> slowConsumerSession.close();
> fastConsumer.close();
> fastConsumerSession.close();
> broker.stop();
> Assert.assertEquals("Fast consumer missed messages whilst slow
> consumer was blocking", sent, fastConsumerReceived);
> Assert.assertEquals("Slow consumer received incorrect message count",
> Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit :
> Integer.MAX_VALUE)), slowConsumerReceived.size());
> }
> private static BrokerService createBroker(boolean persistent) throws
> Exception {
> BrokerService broker = new BrokerService();
> broker.setBrokerName("TestBroker");
> broker.setPersistent(persistent);
> broker.addConnector(URL);
> return broker;
> }
> private static MessageConsumer createSubscriber(Session session,
> MessageListener messageListener) throws JMSException {
> MessageConsumer consumer = session.createConsumer(TOPIC);
> consumer.setMessageListener(messageListener);
> return consumer;
> }
> private static MessageProducer createPublisher(Session session) throws
> JMSException {
> MessageProducer producer = session.createProducer(TOPIC);
> producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
> return producer;
> }
> private static Session buildSession(String clientId, String url, boolean
> optimizeAcknowledge) throws JMSException {
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(url);
> connectionFactory.setCopyMessageOnSend(false);
> connectionFactory.setDisableTimeStampsByDefault(true);
> connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
> Connection connection = connectionFactory.createConnection();
> connection.setClientID(clientId);
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> connection.start();
> return session;
> }
> private static PolicyMap buildPolicy(ActiveMQTopic topic, int
> prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean
> disableFlowControl) {
> PolicyMap policyMap = new PolicyMap();
> PolicyEntry policyEntry = new PolicyEntry();
> if (evictOldestMessage) {
> policyEntry.setMessageEvictionStrategy(new
> OldestMessageEvictionStrategy());
> }
> if (disableFlowControl) {
> policyEntry.setProducerFlowControl(false);
> }
> if (prefetchLimit > 0) {
> policyEntry.setTopicPrefetch(prefetchLimit);
> }
> if (messageLimit > 0) {
> ConstantPendingMessageLimitStrategy messageLimitStrategy = new
> ConstantPendingMessageLimitStrategy();
> messageLimitStrategy.setLimit(messageLimit);
> policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
> }
> policyMap.put(topic, policyEntry);
> return policyMap;
> }
> }
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira