Author: kwall
Date: Sun Jul 26 19:39:08 2015
New Revision: 1692751
URL: http://svn.apache.org/r1692751
Log:
QPID-6656: [Java Broker] Ensure queue target size is promptly assigned to new
queues
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1692751&r1=1692750&r2=1692751&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Sun Jul 26 19:39:08 2015
@@ -1364,7 +1364,10 @@ public abstract class AbstractQueue<X ex
@Override
public void setTargetSize(final long targetSize)
{
- _targetQueueSize.set(targetSize);
+ if (_targetQueueSize.compareAndSet(_targetQueueSize.get(), targetSize))
+ {
+ _logger.debug("Queue '{}' target size : {}", getName(),
targetSize);
+ }
}
public long getTotalDequeuedMessages()
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1692751&r1=1692750&r2=1692751&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Sun Jul 26 19:39:08 2015
@@ -217,6 +217,8 @@ public abstract class AbstractVirtualHos
_dataReceived = new StatisticsCounter("bytes-received-" + getName());
_principal = new VirtualHostPrincipal(this);
_fileSystemSpaceChecker = new FileSystemSpaceChecker();
+
+ addChangeListener(new TargetSizeAssigningListener());
}
public void onValidate()
@@ -1132,6 +1134,53 @@ public abstract class AbstractVirtualHos
return null;
}
+ private class TargetSizeAssigningListener implements
ConfigurationChangeListener
+ {
+ @Override
+ public void childAdded(final ConfiguredObject<?> object, final
ConfiguredObject<?> child)
+ {
+ if (child instanceof Queue)
+ {
+ allocateTargetSizeToQueues();
+ }
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject<?> object,
+ final ConfiguredObject<?> child)
+ {
+ if (child instanceof Queue)
+ {
+ allocateTargetSizeToQueues();
+ }
+ }
+
+ @Override
+ public void stateChanged(final ConfiguredObject<?> object,
+ final State oldState,
+ final State newState)
+ {
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject<?> object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+ }
+
+ @Override
+ public void bulkChangeStart(final ConfiguredObject<?> object)
+ {
+ }
+
+ @Override
+ public void bulkChangeEnd(final ConfiguredObject<?> object)
+ {
+ }
+ }
+
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1692751&r1=1692750&r2=1692751&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
Sun Jul 26 19:39:08 2015
@@ -61,7 +61,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
@@ -92,8 +91,7 @@ abstract class AbstractQueueTestBase ext
_virtualHost =
BrokerTestHelper.createVirtualHost(getClass().getName());
- Map<String,Object> attributes = new HashMap<String,
Object>(_arguments);
- attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+ Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME, _qname);
attributes.put(Queue.OWNER, _owner);
@@ -122,8 +120,7 @@ abstract class AbstractQueueTestBase ext
_queue.close();
try
{
- Map<String,Object> attributes = new HashMap<String,
Object>(_arguments);
- attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+ Map<String,Object> attributes = new HashMap<>(_arguments);
_queue = _virtualHost.createQueue(attributes);
assertNull("Queue was created", _queue);
@@ -134,14 +131,14 @@ abstract class AbstractQueueTestBase ext
e.getMessage().contains("name"));
}
- Map<String,Object> attributes = new HashMap<String,
Object>(_arguments);
- attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+ Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME, "differentName");
_queue = _virtualHost.createQueue(attributes);
assertNotNull("Queue was not created", _queue);
}
+
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost,
_queue.getVirtualHost());
@@ -842,15 +839,15 @@ abstract class AbstractQueueTestBase ext
AMQQueue queue = _virtualHost.createQueue(attributes);
- assertEquals("TTL has not been overriden", 60000l,
getExpirationOnQueue(queue, 50000l, 0l));
+ assertEquals("TTL has not been overridden", 60000l,
getExpirationOnQueue(queue, 50000l, 0l));
- assertEquals("TTL has not been overriden", 60000l,
getExpirationOnQueue(queue, 50000l, 65000l));
+ assertEquals("TTL has not been overridden", 60000l,
getExpirationOnQueue(queue, 50000l, 65000l));
- assertEquals("TTL has been incorrectly overriden", 55000l,
getExpirationOnQueue(queue, 50000l, 55000l));
+ assertEquals("TTL has been incorrectly overridden", 55000l,
getExpirationOnQueue(queue, 50000l, 55000l));
long tooLateExpiration = System.currentTimeMillis() + 20000l;
- assertTrue("TTL has not been overriden", tooLateExpiration !=
getExpirationOnQueue(queue, 0l, tooLateExpiration));
+ assertTrue("TTL has not been overridden", tooLateExpiration !=
getExpirationOnQueue(queue, 0l, tooLateExpiration));
long acceptableExpiration = System.currentTimeMillis() + 5000l;
@@ -864,19 +861,19 @@ abstract class AbstractQueueTestBase ext
queue = _virtualHost.createQueue(attributes);
- assertEquals("TTL has been overriden incorrectly", 0l,
getExpirationOnQueue(queue, 50000l, 0l));
+ assertEquals("TTL has been overridden incorrectly", 0l,
getExpirationOnQueue(queue, 50000l, 0l));
- assertEquals("TTL has been overriden incorrectly", 65000l,
getExpirationOnQueue(queue, 50000l, 65000l));
+ assertEquals("TTL has been overridden incorrectly", 65000l,
getExpirationOnQueue(queue, 50000l, 65000l));
assertEquals("TTL has not been overriden", 60000l,
getExpirationOnQueue(queue, 50000l, 55000l));
long unacceptableExpiration = System.currentTimeMillis() + 5000l;
- assertTrue("TTL has not been overriden", unacceptableExpiration !=
getExpirationOnQueue(queue, 0l, tooLateExpiration));
+ assertTrue("TTL has not been overridden", unacceptableExpiration !=
getExpirationOnQueue(queue, 0l, tooLateExpiration));
acceptableExpiration = System.currentTimeMillis() + 20000l;
- assertEquals("TTL has been incorrectly overriden",
acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
+ assertEquals("TTL has been incorrectly overridden",
acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
// Test the scenarios where both the minimum and maximum TTL have been
set
@@ -888,11 +885,11 @@ abstract class AbstractQueueTestBase ext
queue = _virtualHost.createQueue(attributes);
- assertEquals("TTL has not been overriden", 70000l,
getExpirationOnQueue(queue, 50000l, 0l));
+ assertEquals("TTL has not been overridden", 70000l,
getExpirationOnQueue(queue, 50000l, 0l));
- assertEquals("TTL has been overriden incorrectly", 65000l,
getExpirationOnQueue(queue, 50000l, 65000l));
+ assertEquals("TTL has been overridden incorrectly", 65000l,
getExpirationOnQueue(queue, 50000l, 65000l));
- assertEquals("TTL has not been overriden", 60000l,
getExpirationOnQueue(queue, 50000l, 55000l));
+ assertEquals("TTL has not been overridden", 60000l,
getExpirationOnQueue(queue, 50000l, 55000l));
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1692751&r1=1692750&r2=1692751&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
Sun Jul 26 19:39:08 2015
@@ -25,7 +25,6 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -46,8 +45,7 @@ public class StandardQueueTest extends A
{
getQueue().close();
getQueue().deleteAndReturnCount();
- Map<String,Object> queueAttributes = new HashMap<String, Object>();
- queueAttributes.put(Queue.ID, UUID.randomUUID());
+ Map<String,Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, getQname());
queueAttributes.put(Queue.LIFETIME_POLICY,
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes,
getVirtualHost());
@@ -69,8 +67,7 @@ public class StandardQueueTest extends A
public void testActiveConsumerCount() throws Exception
{
- Map<String,Object> queueAttributes = new HashMap<String, Object>();
- queueAttributes.put(Queue.ID, UUID.randomUUID());
+ Map<String,Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
queueAttributes.put(Queue.OWNER, "testOwner");
final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes,
getVirtualHost());
@@ -177,8 +174,7 @@ public class StandardQueueTest extends A
int messageNumber = 4;
int dequeueMessageIndex = 1;
- Map<String,Object> queueAttributes = new HashMap<String, Object>();
- queueAttributes.put(Queue.ID, UUID.randomUUID());
+ Map<String,Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, "test");
// create queue with overridden method deliverAsync
StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes,
getVirtualHost());
@@ -260,8 +256,7 @@ public class StandardQueueTest extends A
private static Map<String,Object> attributes()
{
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.NAME, "test");
attributes.put(Queue.DURABLE, false);
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]