Author: kwall
Date: Fri Mar 24 10:26:21 2017
New Revision: 1788401

URL: http://svn.apache.org/viewvc?rev=1788401&view=rev
Log:
QPID-7722: [0-10] Use same PrivilegedAction to execute a contiguous frames for 
the same channel

Modified:
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java?rev=1788401&r1=1788400&r2=1788401&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
 Fri Mar 24 10:26:21 2017
@@ -22,12 +22,18 @@ package org.apache.qpid.server.protocol.
 
 
 import java.nio.ByteBuffer;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +71,47 @@ public class ServerAssembler
         _segments = new HashMap<>();
     }
 
-    public void received(final ServerFrame event)
+    public void received(final List<ServerFrame> frames)
+    {
+        if (!frames.isEmpty())
+        {
+            PeekingIterator<ServerFrame> itr = 
Iterators.peekingIterator(frames.iterator());
+
+            while(itr.hasNext())
+            {
+                final ServerFrame frame = itr.next();
+                final int frameChannel = frame.getChannel();
+
+                ServerSession channel = _connection.getSession(frameChannel);
+                if (channel != null)
+                {
+                    final AccessControlContext context = 
channel.getAccessControllerContext();
+                    AccessController.doPrivileged((PrivilegedAction<Void>) () 
->
+                    {
+                        ServerFrame channelFrame = frame;
+                        boolean nextIsSameChannel;
+                        do
+                        {
+                            received(channelFrame);
+                            nextIsSameChannel = itr.hasNext() && frameChannel 
== itr.peek().getChannel();
+                            if (nextIsSameChannel)
+                            {
+                                channelFrame = itr.next();
+                            }
+                        }
+                        while (nextIsSameChannel);
+                        return null;
+                    }, context);
+                }
+                else
+                {
+                    received(frame);
+                }
+            }
+        }
+    }
+
+    private void received(final ServerFrame event)
     {
         if (!_connection.isIgnoreFutureInput())
         {

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1788401&r1=1788400&r2=1788401&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 Fri Mar 24 10:26:21 2017
@@ -26,10 +26,7 @@ import static org.apache.qpid.server.pro
 import static 
org.apache.qpid.server.protocol.v0_10.ServerConnection.State.OPEN;
 
 import java.net.SocketAddress;
-import java.security.AccessControlContext;
-import java.security.AccessController;
 import java.security.Principal;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -288,45 +285,15 @@ public class ServerConnection extends Co
     public void received(final ProtocolEvent event)
     {
         _lastIoTime.set(System.currentTimeMillis());
-        AccessControlContext context;
-        if (event.isConnectionControl())
-        {
-            context = _amqpConnection.getAccessControllerContext();
-        }
-        else
-        {
-            ServerSession channel = (ServerSession) 
getSession(event.getChannel());
-            if (channel != null)
-            {
-                context = channel.getAccessControllerContext();
-            }
-            else
-            {
-                context = _amqpConnection.getAccessControllerContext();
-            }
-        }
 
         if(!_ignoreAllButConnectionCloseOk || (event instanceof 
ConnectionCloseOk))
         {
-            AccessController.doPrivileged(new PrivilegedAction<Void>()
+            if(LOGGER.isDebugEnabled())
             {
-                @Override
-                public Void run()
-                {
-                    receivedSuper(event);
-                    return null;
-                }
-            }, context);
-        }
-    }
-
-    private void receivedSuper(ProtocolEvent event)
-    {
-        if(LOGGER.isDebugEnabled())
-        {
-            LOGGER.debug("RECV: [{}] {}", this, String.valueOf(event));
+                LOGGER.debug("RECV: [{}] {}", this, String.valueOf(event));
+            }
+            event.delegate(this, delegate);
         }
-        event.delegate(this, delegate);
     }
 
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java?rev=1788401&r1=1788400&r2=1788401&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
 Fri Mar 24 10:26:21 2017
@@ -23,6 +23,10 @@ package org.apache.qpid.server.protocol.
 import static org.apache.qpid.server.transport.util.Functions.str;
 import static org.apache.qpid.server.protocol.v0_10.ServerInputHandler.State.*;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,111 +79,109 @@ public class ServerInputHandler implemen
     public void received(QpidByteBuffer buf)
     {
         int position = buf.position();
+
+        List<ServerFrame> frames = new ArrayList<>();
+
         while(buf.hasRemaining() && _state != ERROR)
         {
-            parse(buf);
-
-            int newPosition = buf.position();
-            if(position == newPosition)
-            {
-                break;
-            }
-            else
-            {
-                position = newPosition;
-            }
-        }
-    }
+            buf.mark();
+            switch (_state) {
+                case PROTO_HDR:
+                    if(buf.remaining() < 8)
+                    {
+                        break;
+                    }
+                    if (buf.get() != 'A' ||
+                        buf.get() != 'M' ||
+                        buf.get() != 'Q' ||
+                        buf.get() != 'P')
+                    {
+                        buf.reset();
+                        error("bad protocol header: %s", str(buf));
+                        _state = ERROR;
+                    }
+                    else
+                    {
+                        byte protoClass = buf.get();
+                        byte instance = buf.get();
+                        byte major = buf.get();
+                        byte minor = buf.get();
 
-    private void parse(QpidByteBuffer buffer)
-    {
-        buffer.mark();
-        switch (_state) {
-            case PROTO_HDR:
-                if(buffer.remaining() < 8)
-                {
+                        _serverAssembler.init(new ProtocolHeader(protoClass, 
instance, major, minor));
+                        _state = FRAME_HDR;
+                    }
                     break;
-                }
-                if (buffer.get() != 'A' ||
-                    buffer.get() != 'M' ||
-                    buffer.get() != 'Q' ||
-                    buffer.get() != 'P')
-                {
-                    buffer.reset();
-                    error("bad protocol header: %s", str(buffer));
-                    _state = ERROR;
-                }
-                else
-                {
-                    byte protoClass = buffer.get();
-                    byte instance = buffer.get();
-                    byte major = buffer.get();
-                    byte minor = buffer.get();
-
-                    _serverAssembler.init(new ProtocolHeader(protoClass, 
instance, major, minor));
-                    _state = FRAME_HDR;
-                }
-                break;
-            case FRAME_HDR:
-                if(buffer.remaining() < ServerFrame.HEADER_SIZE)
-                {
-                    buffer.reset();
-                }
-                else
-                {
-                    flags = buffer.get();
-                    type = SegmentType.get(buffer.get());
-                    int size = (0xFFFF & buffer.getShort());
-
-                    size -= ServerFrame.HEADER_SIZE;
-                    if (size < 0 || size > (_maxFrameSize - 
ServerFrame.HEADER_SIZE))
+                case FRAME_HDR:
+                    if(buf.remaining() < ServerFrame.HEADER_SIZE)
                     {
-                        error("bad frame size: %d", size);
-                        _state = ERROR;
+                        buf.reset();
                     }
                     else
                     {
-                        buffer.get(); // skip unused byte
-                        byte b = buffer.get();
-                        if ((b & 0xF0) != 0)
+                        flags = buf.get();
+                        type = SegmentType.get(buf.get());
+                        int size = (0xFFFF & buf.getShort());
+
+                        size -= ServerFrame.HEADER_SIZE;
+                        if (size < 0 || size > (_maxFrameSize - 
ServerFrame.HEADER_SIZE))
                         {
-                            error("non-zero reserved bits in upper nibble of " 
+
-                                  "frame header byte 5: '%x'", b);
+                            error("bad frame size: %d", size);
                             _state = ERROR;
                         }
                         else
                         {
-                            track = (byte) (b & 0xF);
-
-                            channel = (0xFFFF & buffer.getShort());
-                            buffer.position(buffer.position()+4);
-                            if (size == 0)
+                            buf.get(); // skip unused byte
+                            byte b = buf.get();
+                            if ((b & 0xF0) != 0)
                             {
-                                ServerFrame frame = new ServerFrame(flags, 
type, track, channel, EMPTY_BYTE_BUFFER.duplicate());
-                                _serverAssembler.received(frame);
-
-                            }
-                            else if (buffer.remaining() < size)
-                            {
-                                buffer.reset();
+                                error("non-zero reserved bits in upper nibble 
of " +
+                                      "frame header byte 5: '%x'", b);
+                                _state = ERROR;
                             }
                             else
                             {
-                                final QpidByteBuffer body = buffer.slice();
-                                body.limit(size);
-                                ServerFrame frame = new ServerFrame(flags, 
type, track, channel, body);
-                                buffer.position(buffer.position() + size);
+                                track = (byte) (b & 0xF);
 
-                                _serverAssembler.received(frame);
+                                channel = (0xFFFF & buf.getShort());
+                                buf.position(buf.position() + 4);
+                                if (size == 0)
+                                {
+                                    ServerFrame frame = new ServerFrame(flags, 
type, track, channel, EMPTY_BYTE_BUFFER.duplicate());
+                                    frames.add(frame);
+
+                                }
+                                else if (buf.remaining() < size)
+                                {
+                                    buf.reset();
+                                }
+                                else
+                                {
+                                    final QpidByteBuffer body = buf.slice();
+                                    body.limit(size);
+                                    ServerFrame frame = new ServerFrame(flags, 
type, track, channel, body);
+                                    frames.add(frame);
+                                    buf.position(buf.position() + size);
+                                }
                             }
                         }
                     }
-                }
+                    break;
+                default:
+                    throw new IllegalStateException();
+            }
+
+            int newPosition = buf.position();
+            if(position == newPosition)
+            {
                 break;
-            default:
-                throw new IllegalStateException();
+            }
+            else
+            {
+                position = newPosition;
+            }
         }
 
+        _serverAssembler.received(frames);
     }
 
     public void exception(Throwable t)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to