Author: kwall
Date: Fri Mar 24 10:26:21 2017
New Revision: 1788401
URL: http://svn.apache.org/viewvc?rev=1788401&view=rev
Log:
QPID-7722: [0-10] Use same PrivilegedAction to execute a contiguous frames for
the same channel
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java?rev=1788401&r1=1788400&r2=1788401&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
Fri Mar 24 10:26:21 2017
@@ -22,12 +22,18 @@ package org.apache.qpid.server.protocol.
import java.nio.ByteBuffer;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +71,47 @@ public class ServerAssembler
_segments = new HashMap<>();
}
- public void received(final ServerFrame event)
+ public void received(final List<ServerFrame> frames)
+ {
+ if (!frames.isEmpty())
+ {
+ PeekingIterator<ServerFrame> itr =
Iterators.peekingIterator(frames.iterator());
+
+ while(itr.hasNext())
+ {
+ final ServerFrame frame = itr.next();
+ final int frameChannel = frame.getChannel();
+
+ ServerSession channel = _connection.getSession(frameChannel);
+ if (channel != null)
+ {
+ final AccessControlContext context =
channel.getAccessControllerContext();
+ AccessController.doPrivileged((PrivilegedAction<Void>) ()
->
+ {
+ ServerFrame channelFrame = frame;
+ boolean nextIsSameChannel;
+ do
+ {
+ received(channelFrame);
+ nextIsSameChannel = itr.hasNext() && frameChannel
== itr.peek().getChannel();
+ if (nextIsSameChannel)
+ {
+ channelFrame = itr.next();
+ }
+ }
+ while (nextIsSameChannel);
+ return null;
+ }, context);
+ }
+ else
+ {
+ received(frame);
+ }
+ }
+ }
+ }
+
+ private void received(final ServerFrame event)
{
if (!_connection.isIgnoreFutureInput())
{
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1788401&r1=1788400&r2=1788401&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
Fri Mar 24 10:26:21 2017
@@ -26,10 +26,7 @@ import static org.apache.qpid.server.pro
import static
org.apache.qpid.server.protocol.v0_10.ServerConnection.State.OPEN;
import java.net.SocketAddress;
-import java.security.AccessControlContext;
-import java.security.AccessController;
import java.security.Principal;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -288,45 +285,15 @@ public class ServerConnection extends Co
public void received(final ProtocolEvent event)
{
_lastIoTime.set(System.currentTimeMillis());
- AccessControlContext context;
- if (event.isConnectionControl())
- {
- context = _amqpConnection.getAccessControllerContext();
- }
- else
- {
- ServerSession channel = (ServerSession)
getSession(event.getChannel());
- if (channel != null)
- {
- context = channel.getAccessControllerContext();
- }
- else
- {
- context = _amqpConnection.getAccessControllerContext();
- }
- }
if(!_ignoreAllButConnectionCloseOk || (event instanceof
ConnectionCloseOk))
{
- AccessController.doPrivileged(new PrivilegedAction<Void>()
+ if(LOGGER.isDebugEnabled())
{
- @Override
- public Void run()
- {
- receivedSuper(event);
- return null;
- }
- }, context);
- }
- }
-
- private void receivedSuper(ProtocolEvent event)
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("RECV: [{}] {}", this, String.valueOf(event));
+ LOGGER.debug("RECV: [{}] {}", this, String.valueOf(event));
+ }
+ event.delegate(this, delegate);
}
- event.delegate(this, delegate);
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java?rev=1788401&r1=1788400&r2=1788401&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
Fri Mar 24 10:26:21 2017
@@ -23,6 +23,10 @@ package org.apache.qpid.server.protocol.
import static org.apache.qpid.server.transport.util.Functions.str;
import static org.apache.qpid.server.protocol.v0_10.ServerInputHandler.State.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,111 +79,109 @@ public class ServerInputHandler implemen
public void received(QpidByteBuffer buf)
{
int position = buf.position();
+
+ List<ServerFrame> frames = new ArrayList<>();
+
while(buf.hasRemaining() && _state != ERROR)
{
- parse(buf);
-
- int newPosition = buf.position();
- if(position == newPosition)
- {
- break;
- }
- else
- {
- position = newPosition;
- }
- }
- }
+ buf.mark();
+ switch (_state) {
+ case PROTO_HDR:
+ if(buf.remaining() < 8)
+ {
+ break;
+ }
+ if (buf.get() != 'A' ||
+ buf.get() != 'M' ||
+ buf.get() != 'Q' ||
+ buf.get() != 'P')
+ {
+ buf.reset();
+ error("bad protocol header: %s", str(buf));
+ _state = ERROR;
+ }
+ else
+ {
+ byte protoClass = buf.get();
+ byte instance = buf.get();
+ byte major = buf.get();
+ byte minor = buf.get();
- private void parse(QpidByteBuffer buffer)
- {
- buffer.mark();
- switch (_state) {
- case PROTO_HDR:
- if(buffer.remaining() < 8)
- {
+ _serverAssembler.init(new ProtocolHeader(protoClass,
instance, major, minor));
+ _state = FRAME_HDR;
+ }
break;
- }
- if (buffer.get() != 'A' ||
- buffer.get() != 'M' ||
- buffer.get() != 'Q' ||
- buffer.get() != 'P')
- {
- buffer.reset();
- error("bad protocol header: %s", str(buffer));
- _state = ERROR;
- }
- else
- {
- byte protoClass = buffer.get();
- byte instance = buffer.get();
- byte major = buffer.get();
- byte minor = buffer.get();
-
- _serverAssembler.init(new ProtocolHeader(protoClass,
instance, major, minor));
- _state = FRAME_HDR;
- }
- break;
- case FRAME_HDR:
- if(buffer.remaining() < ServerFrame.HEADER_SIZE)
- {
- buffer.reset();
- }
- else
- {
- flags = buffer.get();
- type = SegmentType.get(buffer.get());
- int size = (0xFFFF & buffer.getShort());
-
- size -= ServerFrame.HEADER_SIZE;
- if (size < 0 || size > (_maxFrameSize -
ServerFrame.HEADER_SIZE))
+ case FRAME_HDR:
+ if(buf.remaining() < ServerFrame.HEADER_SIZE)
{
- error("bad frame size: %d", size);
- _state = ERROR;
+ buf.reset();
}
else
{
- buffer.get(); // skip unused byte
- byte b = buffer.get();
- if ((b & 0xF0) != 0)
+ flags = buf.get();
+ type = SegmentType.get(buf.get());
+ int size = (0xFFFF & buf.getShort());
+
+ size -= ServerFrame.HEADER_SIZE;
+ if (size < 0 || size > (_maxFrameSize -
ServerFrame.HEADER_SIZE))
{
- error("non-zero reserved bits in upper nibble of "
+
- "frame header byte 5: '%x'", b);
+ error("bad frame size: %d", size);
_state = ERROR;
}
else
{
- track = (byte) (b & 0xF);
-
- channel = (0xFFFF & buffer.getShort());
- buffer.position(buffer.position()+4);
- if (size == 0)
+ buf.get(); // skip unused byte
+ byte b = buf.get();
+ if ((b & 0xF0) != 0)
{
- ServerFrame frame = new ServerFrame(flags,
type, track, channel, EMPTY_BYTE_BUFFER.duplicate());
- _serverAssembler.received(frame);
-
- }
- else if (buffer.remaining() < size)
- {
- buffer.reset();
+ error("non-zero reserved bits in upper nibble
of " +
+ "frame header byte 5: '%x'", b);
+ _state = ERROR;
}
else
{
- final QpidByteBuffer body = buffer.slice();
- body.limit(size);
- ServerFrame frame = new ServerFrame(flags,
type, track, channel, body);
- buffer.position(buffer.position() + size);
+ track = (byte) (b & 0xF);
- _serverAssembler.received(frame);
+ channel = (0xFFFF & buf.getShort());
+ buf.position(buf.position() + 4);
+ if (size == 0)
+ {
+ ServerFrame frame = new ServerFrame(flags,
type, track, channel, EMPTY_BYTE_BUFFER.duplicate());
+ frames.add(frame);
+
+ }
+ else if (buf.remaining() < size)
+ {
+ buf.reset();
+ }
+ else
+ {
+ final QpidByteBuffer body = buf.slice();
+ body.limit(size);
+ ServerFrame frame = new ServerFrame(flags,
type, track, channel, body);
+ frames.add(frame);
+ buf.position(buf.position() + size);
+ }
}
}
}
- }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+
+ int newPosition = buf.position();
+ if(position == newPosition)
+ {
break;
- default:
- throw new IllegalStateException();
+ }
+ else
+ {
+ position = newPosition;
+ }
}
+ _serverAssembler.received(frames);
}
public void exception(Throwable t)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]