Author: rgodfrey
Date: Sun Feb 21 23:59:55 2016
New Revision: 1731576
URL: http://svn.apache.org/viewvc?rev=1731576&view=rev
Log:
QPID-6764 : Only perform relevant notification checks on new message arrival
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1731576&r1=1731575&r2=1731576&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Sun Feb 21 23:59:55 2016
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -235,7 +236,7 @@ public abstract class AbstractQueue<X ex
private MessageGroupManager _messageGroupManager;
- private QueueNotificationListener _notificationListener;
+ private QueueNotificationListener _notificationListener =
NULL_NOTIFICATION_LISTENER;
private final long[] _lastNotificationTimes = new
long[NotificationCheck.values().length];
@ManagedAttributeField
@@ -1165,7 +1166,7 @@ public abstract class AbstractQueue<X ex
deliverAsync();
}
- checkForNotification(entry.getMessage());
+ checkForNotificationOnNewMessage(entry.getMessage());
}
finally
{
@@ -2460,6 +2461,24 @@ public abstract class AbstractQueue<X ex
final long estimatedQueueSize = _atomicQueueSize.get() +
_atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
_flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize,
_targetQueueSize.get());
+ final Set<NotificationCheck> perMessageChecks = new HashSet<>();
+ final Set<NotificationCheck> queueLevelChecks = new HashSet<>();
+
+ for(NotificationCheck check : getNotificationChecks())
+ {
+ if(check.isMessageSpecific())
+ {
+ perMessageChecks.add(check);
+ }
+ else
+ {
+ queueLevelChecks.add(check);
+ }
+ }
+ QueueNotificationListener listener = _notificationListener;
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - getAlertRepeatGap();
+
long cumulativeQueueSize = 0;
while (queueListIterator.advance())
{
@@ -2498,11 +2517,21 @@ public abstract class AbstractQueue<X ex
cumulativeQueueSize += msg.getSize() +
_estimatedAverageMessageHeaderSize;
_flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(),
cumulativeQueueSize,
_targetQueueSize.get());
- checkForNotification(msg);
+
+ for(NotificationCheck check : perMessageChecks)
+ {
+ performNotificationCheck(msg, listener,
currentTime, thresholdTime, check);
+ }
}
}
}
}
+
+ for(NotificationCheck check : queueLevelChecks)
+ {
+ performNotificationCheck(null, listener, currentTime,
thresholdTime, check);
+ }
+
}
@Override
@@ -2753,22 +2782,50 @@ public abstract class AbstractQueue<X ex
/**
* Checks if there is any notification to send to the listeners
*/
- private void checkForNotification(ServerMessage<?> msg)
+ private void checkForNotification(ServerMessage<?> msg, final boolean
arrivalCheck)
{
final Set<NotificationCheck> notificationChecks =
getNotificationChecks();
QueueNotificationListener listener = _notificationListener;
- if(listener == null)
+ if(!notificationChecks.isEmpty())
{
- listener = NULL_NOTIFICATION_LISTENER;
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - getAlertRepeatGap();
+
+ for (NotificationCheck check : notificationChecks)
+ {
+ performNotificationCheck(msg, listener, currentTime,
thresholdTime, check);
+ }
+ }
+ }
+
+ private void performNotificationCheck(final ServerMessage<?> msg,
+ final QueueNotificationListener
listener,
+ final long currentTime,
+ final long thresholdTime,
+ final NotificationCheck check)
+ {
+ if (check.isMessageSpecific() ||
(_lastNotificationTimes[check.ordinal()] < thresholdTime))
+ {
+ if (check.notifyIfNecessary(msg, this, listener))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
}
- if(listener != null && !notificationChecks.isEmpty())
+ }
+
+ private void checkForNotificationOnNewMessage(final ServerMessage<?> msg)
+ {
+ final Set<NotificationCheck> notificationChecks =
getNotificationChecks();
+ QueueNotificationListener listener = _notificationListener;
+ if (!notificationChecks.isEmpty())
{
final long currentTime = System.currentTimeMillis();
final long thresholdTime = currentTime - getAlertRepeatGap();
for (NotificationCheck check : notificationChecks)
{
- if (check.isMessageSpecific() ||
(_lastNotificationTimes[check.ordinal()] < thresholdTime))
+ if (check.isCheckOnMessageArrival()
+ && (check.isMessageSpecific() ||
(_lastNotificationTimes[check.ordinal()] < thresholdTime)))
{
if (check.notifyIfNecessary(msg, this, listener))
{
@@ -2781,7 +2838,7 @@ public abstract class AbstractQueue<X ex
public void setNotificationListener(QueueNotificationListener listener)
{
- _notificationListener = listener;
+ _notificationListener = listener == null ? NULL_NOTIFICATION_LISTENER
: listener;
}
public final <M extends ServerMessage<? extends StorableMessageMetaData>>
int send(final M message,
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1731576&r1=1731575&r2=1731576&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
Sun Feb 21 23:59:55 2016
@@ -48,7 +48,7 @@ public enum NotificationCheck
return false;
}
},
- MESSAGE_SIZE_ALERT(true)
+ MESSAGE_SIZE_ALERT(true, true)
{
public boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> queue,
QueueNotificationListener listener)
{
@@ -96,7 +96,7 @@ public enum NotificationCheck
}
},
- MESSAGE_AGE_ALERT
+ MESSAGE_AGE_ALERT(false, true)
{
public boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> queue,
QueueNotificationListener listener)
{
@@ -129,15 +129,18 @@ public enum NotificationCheck
private static final Logger LOGGER =
LoggerFactory.getLogger(NotificationCheck.class);
private final boolean _messageSpecific;
+ private final boolean _checkOnMessageArrival;
+
NotificationCheck()
{
- this(false);
+ this(false, true);
}
- NotificationCheck(boolean messageSpecific)
+ NotificationCheck(boolean messageSpecific, final boolean
checkOnMessageArrival)
{
_messageSpecific = messageSpecific;
+ _checkOnMessageArrival = checkOnMessageArrival;
}
public boolean isMessageSpecific()
@@ -145,6 +148,11 @@ public enum NotificationCheck
return _messageSpecific;
}
+ public boolean isCheckOnMessageArrival()
+ {
+ return _checkOnMessageArrival;
+ }
+
public abstract boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?>
queue, QueueNotificationListener listener);
//A bit of a hack, only for use until we do the logging listener
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]