Author: rgodfrey
Date: Sun Feb 21 23:59:55 2016
New Revision: 1731576

URL: http://svn.apache.org/viewvc?rev=1731576&view=rev
Log:
QPID-6764 : Only perform relevant notification checks on new message arrival

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1731576&r1=1731575&r2=1731576&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Sun Feb 21 23:59:55 2016
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -235,7 +236,7 @@ public abstract class AbstractQueue<X ex
 
     private MessageGroupManager _messageGroupManager;
 
-    private QueueNotificationListener  _notificationListener;
+    private QueueNotificationListener  _notificationListener = 
NULL_NOTIFICATION_LISTENER;
     private final long[] _lastNotificationTimes = new 
long[NotificationCheck.values().length];
 
     @ManagedAttributeField
@@ -1165,7 +1166,7 @@ public abstract class AbstractQueue<X ex
                 deliverAsync();
             }
 
-            checkForNotification(entry.getMessage());
+            checkForNotificationOnNewMessage(entry.getMessage());
         }
         finally
         {
@@ -2460,6 +2461,24 @@ public abstract class AbstractQueue<X ex
         final long estimatedQueueSize = _atomicQueueSize.get() + 
_atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
         
_flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, 
_targetQueueSize.get());
 
+        final Set<NotificationCheck> perMessageChecks = new HashSet<>();
+        final Set<NotificationCheck> queueLevelChecks = new HashSet<>();
+
+        for(NotificationCheck check : getNotificationChecks())
+        {
+            if(check.isMessageSpecific())
+            {
+                perMessageChecks.add(check);
+            }
+            else
+            {
+                queueLevelChecks.add(check);
+            }
+        }
+        QueueNotificationListener listener = _notificationListener;
+        final long currentTime = System.currentTimeMillis();
+        final long thresholdTime = currentTime - getAlertRepeatGap();
+
         long cumulativeQueueSize = 0;
         while (queueListIterator.advance())
         {
@@ -2498,11 +2517,21 @@ public abstract class AbstractQueue<X ex
                         cumulativeQueueSize += msg.getSize() + 
_estimatedAverageMessageHeaderSize;
                         
_flowToDiskChecker.flowToDiskIfNecessary(msg.getStoredMessage(), 
cumulativeQueueSize,
                                                                  
_targetQueueSize.get());
-                        checkForNotification(msg);
+
+                        for(NotificationCheck check : perMessageChecks)
+                        {
+                            performNotificationCheck(msg, listener, 
currentTime, thresholdTime, check);
+                        }
                     }
                 }
             }
         }
+
+        for(NotificationCheck check : queueLevelChecks)
+        {
+            performNotificationCheck(null, listener, currentTime, 
thresholdTime, check);
+        }
+
     }
 
     @Override
@@ -2753,22 +2782,50 @@ public abstract class AbstractQueue<X ex
     /**
      * Checks if there is any notification to send to the listeners
      */
-    private void checkForNotification(ServerMessage<?> msg)
+    private void checkForNotification(ServerMessage<?> msg, final boolean 
arrivalCheck)
     {
         final Set<NotificationCheck> notificationChecks = 
getNotificationChecks();
         QueueNotificationListener  listener = _notificationListener;
-        if(listener == null)
+        if(!notificationChecks.isEmpty())
         {
-            listener = NULL_NOTIFICATION_LISTENER;
+            final long currentTime = System.currentTimeMillis();
+            final long thresholdTime = currentTime - getAlertRepeatGap();
+
+            for (NotificationCheck check : notificationChecks)
+            {
+                performNotificationCheck(msg, listener, currentTime, 
thresholdTime, check);
+            }
+        }
+    }
+
+    private void performNotificationCheck(final ServerMessage<?> msg,
+                                          final QueueNotificationListener 
listener,
+                                          final long currentTime,
+                                          final long thresholdTime,
+                                          final NotificationCheck check)
+    {
+        if (check.isMessageSpecific() || 
(_lastNotificationTimes[check.ordinal()] < thresholdTime))
+        {
+            if (check.notifyIfNecessary(msg, this, listener))
+            {
+                _lastNotificationTimes[check.ordinal()] = currentTime;
+            }
         }
-        if(listener != null && !notificationChecks.isEmpty())
+    }
+
+    private void checkForNotificationOnNewMessage(final ServerMessage<?> msg)
+    {
+        final Set<NotificationCheck> notificationChecks = 
getNotificationChecks();
+        QueueNotificationListener listener = _notificationListener;
+        if (!notificationChecks.isEmpty())
         {
             final long currentTime = System.currentTimeMillis();
             final long thresholdTime = currentTime - getAlertRepeatGap();
 
             for (NotificationCheck check : notificationChecks)
             {
-                if (check.isMessageSpecific() || 
(_lastNotificationTimes[check.ordinal()] < thresholdTime))
+                if (check.isCheckOnMessageArrival()
+                    && (check.isMessageSpecific() || 
(_lastNotificationTimes[check.ordinal()] < thresholdTime)))
                 {
                     if (check.notifyIfNecessary(msg, this, listener))
                     {
@@ -2781,7 +2838,7 @@ public abstract class AbstractQueue<X ex
 
     public void setNotificationListener(QueueNotificationListener  listener)
     {
-        _notificationListener = listener;
+        _notificationListener = listener == null ? NULL_NOTIFICATION_LISTENER 
: listener;
     }
 
     public final  <M extends ServerMessage<? extends StorableMessageMetaData>> 
int send(final M message,

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1731576&r1=1731575&r2=1731576&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
 Sun Feb 21 23:59:55 2016
@@ -48,7 +48,7 @@ public enum NotificationCheck
             return false;
         }
     },
-    MESSAGE_SIZE_ALERT(true)
+    MESSAGE_SIZE_ALERT(true, true)
     {
         public boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> queue, 
QueueNotificationListener  listener)
         {
@@ -96,7 +96,7 @@ public enum NotificationCheck
         }
 
     },
-    MESSAGE_AGE_ALERT
+    MESSAGE_AGE_ALERT(false, true)
     {
         public boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> queue, 
QueueNotificationListener  listener)
         {
@@ -129,15 +129,18 @@ public enum NotificationCheck
     private static final Logger LOGGER = 
LoggerFactory.getLogger(NotificationCheck.class);
 
     private final boolean _messageSpecific;
+    private final boolean _checkOnMessageArrival;
+
 
     NotificationCheck()
     {
-        this(false);
+        this(false, true);
     }
 
-    NotificationCheck(boolean messageSpecific)
+    NotificationCheck(boolean messageSpecific, final boolean 
checkOnMessageArrival)
     {
         _messageSpecific = messageSpecific;
+        _checkOnMessageArrival = checkOnMessageArrival;
     }
 
     public boolean isMessageSpecific()
@@ -145,6 +148,11 @@ public enum NotificationCheck
         return _messageSpecific;
     }
 
+    public boolean isCheckOnMessageArrival()
+    {
+        return _checkOnMessageArrival;
+    }
+
     public abstract boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> 
queue, QueueNotificationListener  listener);
 
     //A bit of a hack, only for use until we do the logging listener



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to