Author: kwall
Date: Thu Feb 12 15:18:16 2015
New Revision: 1659288

URL: http://svn.apache.org/r1659288
Log:
0-10 queue browser fix.

Modified:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 Thu Feb 12 15:18:16 2015
@@ -316,6 +316,7 @@ class QueueConsumerImpl
     public final void flush()
     {
         _queue.flushConsumer(this);
+        _target.processPending();
     }
 
     public boolean resend(final QueueEntry entry)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 Thu Feb 12 15:18:16 2015
@@ -286,7 +286,16 @@ public class SelectorThread extends Thre
                                 @Override
                                 public void run()
                                 {
-                                    processConnection(connection);
+                                    String currentName = 
Thread.currentThread().getName();
+                                    try
+                                    {
+                                        
Thread.currentThread().setName("NCS-"+connection.getRemoteAddress().toString());
+                                        processConnection(connection);
+                                    }
+                                    finally
+                                    {
+                                        
Thread.currentThread().setName(currentName);
+                                    }
                                 }
                             });
         }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
 Thu Feb 12 15:18:16 2015
@@ -61,7 +61,8 @@ public class ProtocolEngine_0_10  extend
     private long _lastWriteTime = _createTime;
     private volatile boolean _transportBlockedForWriting;
 
-    private volatile boolean _messageAssignmentSuspended;
+    private final AtomicReference<Thread> _messageAssignmentSuspended = new 
AtomicReference<>();
+
     private final AtomicBoolean _stateChanged = new AtomicBoolean();
     private final AtomicReference<Action<ServerProtocolEngine>> _workListener 
= new AtomicReference<>();
 
@@ -81,13 +82,15 @@ public class ProtocolEngine_0_10  extend
     @Override
     public boolean isMessageAssignmentSuspended()
     {
-        return _messageAssignmentSuspended;
+        Thread lock = _messageAssignmentSuspended.get();
+        return lock != null && _messageAssignmentSuspended.get() != 
Thread.currentThread();
     }
 
     @Override
     public void setMessageAssignmentSuspended(final boolean 
messageAssignmentSuspended)
     {
-        _messageAssignmentSuspended = messageAssignmentSuspended;
+        _messageAssignmentSuspended.set(messageAssignmentSuspended ? 
Thread.currentThread() : null);
+
         if(!messageAssignmentSuspended)
         {
            for(AMQSessionModel<?,?> session : _connection.getSessionModels())

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 Thu Feb 12 15:18:16 2015
@@ -209,19 +209,20 @@ public class AMQProtocolEngine implement
     private long _maxMessageSize;
     private volatile boolean _transportBlockedForWriting;
 
-    private volatile boolean _messageAssignmentSuspended;
+    private final AtomicReference<Thread> _messageAssignmentSuspended = new 
AtomicReference<>();
 
 
     @Override
     public boolean isMessageAssignmentSuspended()
     {
-        return _messageAssignmentSuspended;
+        Thread lock = _messageAssignmentSuspended.get();
+        return lock != null && _messageAssignmentSuspended.get() != 
Thread.currentThread();
     }
 
     @Override
     public void setMessageAssignmentSuspended(final boolean 
messageAssignmentSuspended)
     {
-        _messageAssignmentSuspended = messageAssignmentSuspended;
+        _messageAssignmentSuspended.set(messageAssignmentSuspended ? 
Thread.currentThread() : null);
         if(!messageAssignmentSuspended)
         {
             for(AMQSessionModel<?,?> session : getSessionModels())

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
 Thu Feb 12 15:18:16 2015
@@ -144,7 +144,7 @@ public class ProtocolEngine_1_0_0_SASL i
 
     private State _state = State.A;
 
-    private volatile boolean _messageAssignmentSuspended;
+    private final AtomicReference<Thread> _messageAssignmentSuspended = new 
AtomicReference<>();
 
 
 
@@ -166,13 +166,14 @@ public class ProtocolEngine_1_0_0_SASL i
     @Override
     public boolean isMessageAssignmentSuspended()
     {
-        return _messageAssignmentSuspended;
+        Thread lock = _messageAssignmentSuspended.get();
+        return lock != null && _messageAssignmentSuspended.get() != 
Thread.currentThread();
     }
 
     @Override
     public void setMessageAssignmentSuspended(final boolean 
messageAssignmentSuspended)
     {
-        _messageAssignmentSuspended = messageAssignmentSuspended;
+        _messageAssignmentSuspended.set(messageAssignmentSuspended ? 
Thread.currentThread() : null);
 
         if(!messageAssignmentSuspended)
         {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
 Thu Feb 12 15:18:16 2015
@@ -290,7 +290,8 @@ public class SSLTest extends QpidBrokerT
         ByteArrayOutputStream bout = new ByteArrayOutputStream();
         e.printStackTrace(new PrintStream(bout));
         String strace = bout.toString();
-        assertTrue("Correct exception not thrown", 
strace.contains(expectedString));
+        assertTrue("Correct exception not thrown, expecting : " + 
expectedString + " got : " +e,
+                   strace.contains(expectedString));
     }
 
     public void testVerifyLocalHost() throws Exception

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
 Thu Feb 12 15:18:16 2015
@@ -147,6 +147,8 @@ public class QueueBrowserAutoAckTest ext
 
         assertEquals("Session reports Queue expectedDepth not as expected", 
expectedDepth, queueDepth);
 
+        getLogger().debug("KWDEBUG : About to check queue depth using 
browser");
+
 
         // Browse the queue to get a second opinion
         int msgCount = 0;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to