Repository: activemq Updated Branches: refs/heads/master 2809befff -> 01b1f7f69
[AMQ-3233] respect policy entry blockedProducerWarningInterval for flow control warning, 0 disables and Xmillis makes it periodic, default period of 30s is not unlike the existing once behaviour. fix and tests Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/01b1f7f6 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/01b1f7f6 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/01b1f7f6 Branch: refs/heads/master Commit: 01b1f7f6945ba0897732b9eaae2ee1c9d50faf07 Parents: 2809bef Author: gtully <[email protected]> Authored: Fri Mar 3 11:23:23 2017 +0000 Committer: gtully <[email protected]> Committed: Fri Mar 3 11:23:23 2017 +0000 ---------------------------------------------------------------------- .../activemq/broker/region/BaseDestination.java | 21 +++-- .../apache/activemq/broker/region/Queue.java | 7 +- .../apache/activemq/broker/region/Topic.java | 3 +- .../activemq/ProducerFlowControlTest.java | 50 +++++++++- .../usecases/TopicProducerFlowControlTest.java | 97 +++++++++++++------- 5 files changed, 131 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index fa5ae49..aa2f7b5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -68,7 +68,7 @@ public abstract class BaseDestination implements Destination { protected MemoryUsage memoryUsage; private boolean producerFlowControl = true; private boolean alwaysRetroactive = false; - protected boolean warnOnProducerFlowControl = true; + protected long lastBlockedProducerWarnTime = 0l; protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; private int maxProducersToAudit = 1024; @@ -683,7 +683,6 @@ public abstract class BaseDestination implements Destination { } } else { long start = System.currentTimeMillis(); - long nextWarn = start; producerBrokerExchange.blockingOnFlowControl(true); destinationStatistics.getBlockedSends().increment(); while (!usage.waitForSpace(1000, highWaterMark)) { @@ -691,10 +690,8 @@ public abstract class BaseDestination implements Destination { throw new IOException("Connection closed, send aborted."); } - long now = System.currentTimeMillis(); - if (now >= nextWarn) { - getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((now - start) / 1000))}); - nextWarn = now + blockedProducerWarningInterval; + if (isFlowControlLogRequired()) { + getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))}); } } long finish = System.currentTimeMillis(); @@ -705,6 +702,18 @@ public abstract class BaseDestination implements Destination { } } + protected boolean isFlowControlLogRequired() { + boolean answer = false; + if (blockedProducerWarningInterval > 0) { + long now = System.currentTimeMillis(); + if (lastBlockedProducerWarnTime + blockedProducerWarningInterval <= now) { + lastBlockedProducerWarnTime = now; + answer = true; + } + } + return answer; + } + protected abstract Logger getLog(); public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 2b5c0c3..3ead89d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -628,12 +628,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index isFull(context, memoryUsage); fastProducer(context, producerInfo); if (isProducerFlowControl() && context.isProducerFlowControl()) { - if (warnOnProducerFlowControl) { - warnOnProducerFlowControl = false; + if (isFlowControlLogRequired()) { LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", - memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); - } + memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); + } if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index c553e8c..8b46475 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -382,8 +382,7 @@ public class Topic extends BaseDestination implements Task { if (isProducerFlowControl() && context.isProducerFlowControl()) { - if (warnOnProducerFlowControl) { - warnOnProducerFlowControl = false; + if (isFlowControlLogRequired()) { LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); } http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java index 70e15ec..7cf05d3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; @@ -31,12 +33,18 @@ import javax.jms.TextMessage; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,6 +255,44 @@ public class ProducerFlowControlTest extends JmsTestSupport { assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS)); } + public void testDisableWarning() throws Exception { + final AtomicInteger warnings = new AtomicInteger(); + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage Manager Memory Limit")) { + LOG.info("received log message: " + event.getMessage()); + warnings.incrementAndGet(); + } + } + }; + org.apache.log4j.Logger log4jLogger = + org.apache.log4j.Logger.getLogger(Queue.class); + log4jLogger.addAppender(appender); + try { + ConnectionFactory factory = createConnectionFactory(); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + fillQueue(queueB); + assertEquals(1, warnings.get()); + + broker.getDestinationPolicy().getDefaultEntry().setBlockedProducerWarningInterval(0); + warnings.set(0); + + // new connection b/c other is blocked + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + fillQueue(new ActiveMQQueue("SomeOtherQueueToPickUpNewPolicy")); + assertEquals(0, warnings.get()); + + } finally { + log4jLogger.removeAppender(appender); + } + } + private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException { final AtomicBoolean done = new AtomicBoolean(true); final AtomicBoolean keepGoing = new AtomicBoolean(true); @@ -334,7 +380,9 @@ public class ProducerFlowControlTest extends JmsTestSupport { } protected void tearDown() throws Exception { - if (connection != null) { + for (Connection c : connections) { + // force error on blocked connections + ActiveMQConnection connection = (ActiveMQConnection) c; TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class); t.getTransportListener().onException(new IOException("Disposed.")); connection.getTransport().stop(); http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java index 90b6c92..1574ec9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java @@ -33,10 +33,15 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.DefaultTestAppender; import org.apache.activemq.util.Wait; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +75,7 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis tpe.setMemoryLimit(destinationMemLimit); tpe.setProducerFlowControl(true); tpe.setAdvisoryWhenFull(true); + tpe.setBlockedProducerWarningInterval(2000); pm.setPolicyEntries(Arrays.asList(new PolicyEntry[]{tpe})); @@ -128,49 +134,72 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis } }); - // Start producing the test messages - final Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageProducer producer = session.createProducer(destination); + final AtomicInteger warnings = new AtomicInteger(); + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage Manager memory limit reached")) { + LOG.info("received log message: " + event.getMessage()); + warnings.incrementAndGet(); + } + } + }; + org.apache.log4j.Logger log4jLogger = + org.apache.log4j.Logger.getLogger(Topic.class); + log4jLogger.addAppender(appender); + try { - Thread producingThread = new Thread("Producing Thread") { - public void run() { - try { - for (long i = 0; i < numMessagesToSend; i++) { - producer.send(session.createTextMessage("test")); + // Start producing the test messages + final Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(destination); - long count = produced.incrementAndGet(); - if (count % 10000 == 0) { - LOG.info("Produced " + count + " messages"); - } - } - } catch (Throwable ex) { - ex.printStackTrace(); - } finally { + Thread producingThread = new Thread("Producing Thread") { + public void run() { try { - producer.close(); - session.close(); - } catch (Exception e) { + for (long i = 0; i < numMessagesToSend; i++) { + producer.send(session.createTextMessage("test")); + + long count = produced.incrementAndGet(); + if (count % 10000 == 0) { + LOG.info("Produced " + count + " messages"); + } + } + } catch (Throwable ex) { + ex.printStackTrace(); + } finally { + try { + producer.close(); + session.close(); + } catch (Exception e) { + } } } - } - }; + }; - producingThread.start(); + producingThread.start(); - Wait.waitFor(new Wait.Condition() { - public boolean isSatisified() throws Exception { - return consumed.get() == numMessagesToSend; - } - }, 5 * 60 * 1000); // give it plenty of time before failing + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return consumed.get() == numMessagesToSend; + } + }, 5 * 60 * 1000); // give it plenty of time before failing - assertEquals("Didn't produce all messages", numMessagesToSend, produced.get()); - assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get()); + assertEquals("Didn't produce all messages", numMessagesToSend, produced.get()); + assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get()); - assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() { - public boolean isSatisified() throws Exception { - return blockedCounter.get() > 0; - } - }, 5 * 1000)); + assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return blockedCounter.get() > 0; + } + }, 5 * 1000)); + + LOG.info("BlockedCount: " + blockedCounter.get() + ", Warnings:" + warnings.get()); + assertTrue("got a few warnings", warnings.get() > 1); + assertTrue("warning limited", warnings.get() < blockedCounter.get()); + + } finally { + log4jLogger.removeAppender(appender); + } } protected Destination createDestination(Session listenerSession) throws Exception {
