Author: kwall
Date: Sat Jun 30 12:31:49 2012
New Revision: 1355721

URL: http://svn.apache.org/viewvc?rev=1355721&view=rev
Log:
QPID-4902: NPE from SimpleAMQQueue and RejectedExecutionExecution handling

Guard against NPE in setLastSeenEntry.  #execute() method change to ignore REE 
in the case where the
queue has already been stopped (logged at ERROR otherwise).  Change 
Subscription*#_queueContext member
to volatile as this member is get/set from different threads during the queue's 
lifecycle

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1355721&r1=1355720&r2=1355721&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
 Sat Jun 30 12:31:49 2012
@@ -67,7 +67,7 @@ class Subscription_1_0 implements Subscr
     private final QueueEntry.SubscriptionAssignedState _assignedState = new 
QueueEntry.SubscriptionAssignedState(this);
     private final long _id;
     private final boolean _acquires;
-    private AMQQueue.Context _queueContext;
+    private volatile AMQQueue.Context _queueContext;
     private Map<String, Object> _properties = new ConcurrentHashMap<String, 
Object>();
     private ReentrantLock _stateChangeLock = new ReentrantLock();
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1355721&r1=1355720&r2=1355721&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Sat Jun 30 12:31:49 2012
@@ -30,6 +30,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -296,7 +297,22 @@ public class SimpleAMQQueue implements A
 
     public void execute(Runnable runnable)
     {
-        _asyncDelivery.execute(runnable);
+        try
+        {
+            _asyncDelivery.execute(runnable);
+        }
+        catch (RejectedExecutionException ree)
+        {
+            if (_stopped.get())
+            {
+                // Ignore - SubFlusherRunner or QueueRunner submitted 
execution as queue was being stopped.
+            }
+            else
+            {
+                _logger.error("Unexpected rejected execution", ree);
+                throw ree;
+            }
+        }
     }
 
     public AMQShortString getNameShortString()
@@ -863,12 +879,15 @@ public class SimpleAMQQueue implements A
     private void setLastSeenEntry(final Subscription sub, final QueueEntry 
entry)
     {
         QueueContext subContext = (QueueContext) sub.getQueueContext();
-        QueueEntry releasedEntry = subContext.getReleasedEntry();
-
-        QueueContext._lastSeenUpdater.set(subContext, entry);
-        if(releasedEntry == entry)
+        if (subContext != null)
         {
-           QueueContext._releasedUpdater.compareAndSet(subContext, 
releasedEntry, null);
+            QueueEntry releasedEntry = subContext.getReleasedEntry();
+
+            QueueContext._lastSeenUpdater.set(subContext, entry);
+            if(releasedEntry == entry)
+            {
+               QueueContext._releasedUpdater.compareAndSet(subContext, 
releasedEntry, null);
+            }
         }
     }
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1355721&r1=1355720&r2=1355721&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 Sat Jun 30 12:31:49 2012
@@ -76,7 +76,7 @@ public abstract class SubscriptionImpl i
 
 
     private final AtomicReference<State> _state = new 
AtomicReference<State>(State.ACTIVE);
-    private AMQQueue.Context _queueContext;
+    private volatile AMQQueue.Context _queueContext;
 
     private final ClientDeliveryMethod _deliveryMethod;
     private final RecordDeliveryMethod _recordMethod;
@@ -470,11 +470,6 @@ public abstract class SubscriptionImpl i
         _deleted.set(true);
     }
 
-    public boolean filtersMessages()
-    {
-        return _filters != null || _noLocal;
-    }
-
     public boolean hasInterest(QueueEntry entry)
     {
         //check that the message hasn't been rejected
@@ -510,13 +505,6 @@ public abstract class SubscriptionImpl i
 
     }
 
-    private String id = String.valueOf(System.identityHashCode(this));
-
-    private String debugIdentity()
-    {
-        return id;
-    }
-
     private boolean checkFilters(QueueEntry msg)
     {
         return (_filters == null) || _filters.allAllow(msg);

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1355721&r1=1355720&r2=1355721&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 Sat Jun 30 12:31:49 2012
@@ -98,7 +98,7 @@ public class Subscription_0_10 implement
     private final Lock _stateChangeLock = new ReentrantLock();
 
     private final AtomicReference<State> _state = new 
AtomicReference<State>(State.ACTIVE);
-    private AMQQueue.Context _queueContext;
+    private volatile AMQQueue.Context _queueContext;
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
 
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1355721&r1=1355720&r2=1355721&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 Sat Jun 30 12:31:49 2012
@@ -48,7 +48,7 @@ public class MockSubscription implements
     private AMQShortString tag = new AMQShortString("mocktag");
     private AMQQueue queue = null;
     private StateListener _listener = null;
-    private AMQQueue.Context _queueContext = null;
+    private volatile AMQQueue.Context _queueContext = null;
     private State _state = State.ACTIVE;
     private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
     private final Lock _stateChangeLock = new ReentrantLock();



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to