Author: rgodfrey
Date: Mon Oct 13 00:58:45 2014
New Revision: 1631275
URL: http://svn.apache.org/r1631275
Log:
Migrate broker to new direct method dispatch mechanism
Added:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
(with props)
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
(with props)
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
(with props)
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
(with props)
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
(with props)
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ClientMethodProcessor.java
(with props)
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
(with props)
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java
(with props)
Removed:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Mon Oct 13 00:58:45 2014
@@ -115,7 +115,7 @@ import org.apache.qpid.transport.Transpo
public class AMQChannel
implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
AsyncAutoCommitTransaction.FutureRecorder,
- ChannelMethodProcessor
+ ServerChannelMethodProcessor
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -376,27 +376,18 @@ public class AMQChannel
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
- throws AMQException
{
- if (_currentMessage == null)
+ if (_logger.isDebugEnabled())
{
- throw new AMQException("Received content header without previously
receiving a BasicPublish frame");
+ _logger.debug("Content header received on channel " + _channelId);
}
- else
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content header received on channel " +
_channelId);
- }
- _currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setContentHeaderBody(contentHeaderBody);
- deliverCurrentMessageIfComplete();
- }
+ deliverCurrentMessageIfComplete();
}
private void deliverCurrentMessageIfComplete()
- throws AMQException
{
// check and deliver if header says body length is zero
if (_currentMessage.allContentReceived())
@@ -497,7 +488,7 @@ public class AMQChannel
* @throws AMQConnectionException if the message is mandatory
close-on-no-route
* @see AMQProtocolEngine#isCloseWhenNoRoute()
*/
- private void handleUnroutableMessage(AMQMessage message) throws
AMQConnectionException
+ private void handleUnroutableMessage(AMQMessage message)
{
boolean mandatory = message.isMandatory();
String description = currentMessageDescription();
@@ -512,26 +503,27 @@ public class AMQChannel
if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
{
- throw new AMQConnectionException(
- AMQConstant.NO_ROUTE,
- "No route for message " + currentMessageDescription(),
- 0, 0, // default class and method ids
- getConnection().getMethodRegistry(),
- (Throwable) null);
- }
-
- if (mandatory || message.isImmediate())
- {
- _transaction.addPostTransactionAction(new
WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " +
currentMessageDescription(), message));
+ _connection.closeConnection(AMQConstant.NO_ROUTE,
+ "No route for message " + currentMessageDescription(),
_channelId);
}
else
{
- AMQShortString exchangeName = _currentMessage.getExchangeName();
- AMQShortString routingKey =
_currentMessage.getMessagePublishInfo().getRoutingKey();
+ if (mandatory || message.isImmediate())
+ {
+ _transaction.addPostTransactionAction(new
WriteReturnAction(AMQConstant.NO_ROUTE,
+
"No Route for message "
+ +
currentMessageDescription(),
+
message));
+ }
+ else
+ {
+ AMQShortString exchangeName =
_currentMessage.getExchangeName();
+ AMQShortString routingKey =
_currentMessage.getMessagePublishInfo().getRoutingKey();
- getVirtualHost().getEventLogger().message(
- ExchangeMessages.DISCARDMSG(exchangeName == null ? null :
exchangeName.asString(),
- routingKey == null ? null :
routingKey.asString()));
+ getVirtualHost().getEventLogger().message(
+ ExchangeMessages.DISCARDMSG(exchangeName == null ?
null : exchangeName.asString(),
+ routingKey == null ? null
: routingKey.asString()));
+ }
}
}
@@ -550,13 +542,8 @@ public class AMQChannel
:
_currentMessage.getMessagePublishInfo().getRoutingKey().toString());
}
- public void publishContentBody(ContentBody contentBody) throws AMQException
+ public void publishContentBody(ContentBody contentBody)
{
- if (_currentMessage == null)
- {
- throw new AMQException("Received content body without previously
receiving a Content Header");
- }
-
if (_logger.isDebugEnabled())
{
_logger.debug(debugIdentity() + " content body received on channel
" + _channelId);
@@ -568,13 +555,6 @@ public class AMQChannel
deliverCurrentMessageIfComplete();
}
- catch (AMQException e)
- {
- // we want to make sure we don't keep a reference to the message
in the
- // event of an error
- _currentMessage = null;
- throw e;
- }
catch (RuntimeException e)
{
// we want to make sure we don't keep a reference to the message
in the
@@ -1277,14 +1257,10 @@ public class AMQChannel
private AMQMessage createAMQMessage(IncomingMessage incomingMessage,
StoredMessage<MessageMetaData> handle)
- throws AMQException
{
AMQMessage message = new AMQMessage(handle,
_connection.getReference());
- final BasicContentHeaderProperties properties =
- incomingMessage.getContentHeader().getProperties();
-
return message;
}
@@ -1340,6 +1316,11 @@ public class AMQChannel
return _subject;
}
+ public boolean hasCurrentMessage()
+ {
+ return _currentMessage != null;
+ }
+
private class GetDeliveryMethod implements ClientDeliveryMethod
{
@@ -2242,7 +2223,10 @@ public class AMQChannel
}
@Override
- public void receiveChannelClose()
+ public void receiveChannelClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
{
sync();
_connection.closeChannel(this);
@@ -2258,6 +2242,43 @@ public class AMQChannel
}
@Override
+ public void receiveMessageContent(final byte[] data)
+ {
+
+ if(hasCurrentMessage())
+ {
+ publishContentBody(new ContentBody(data));
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "Attempt to send a content header
without first sending a publish frame",
+ _channelId);
+ }
+ }
+
+ @Override
+ public void receiveMessageHeader(final BasicContentHeaderProperties
properties, final long bodySize)
+ {
+ if(hasCurrentMessage())
+ {
+ publishContentHeader(new ContentHeaderBody(properties, bodySize));
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "Attempt to send a content header
without first sending a publish frame",
+ _channelId);
+ }
+ }
+
+ @Override
+ public boolean ignoreAllButCloseOk()
+ {
+ return _connection.ignoreAllButCloseOk() ||
_connection.channelAwaitingClosure(_channelId);
+ }
+
+ @Override
public void receiveChannelFlow(final boolean active)
{
sync();
@@ -2270,9 +2291,15 @@ public class AMQChannel
}
@Override
+ public void receiveChannelFlowOk(final boolean active)
+ {
+ // TODO - should we do anything here?
+ }
+
+ @Override
public void receiveExchangeBound(final AMQShortString exchangeName,
- final AMQShortString queueName,
- final AMQShortString routingKey)
+ final AMQShortString routingKey,
+ final AMQShortString queueName)
{
VirtualHostImpl virtualHost = _connection.getVirtualHost();
MethodRegistry methodRegistry = _connection.getMethodRegistry();
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Mon Oct 13 00:58:45 2014
@@ -21,6 +21,9 @@
package org.apache.qpid.server.protocol.v0_8;
import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -28,8 +31,6 @@ import java.security.AccessControlExcept
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -49,7 +50,6 @@ import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQDecoder;
@@ -58,8 +58,6 @@ import org.apache.qpid.common.ServerProp
import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -93,8 +91,7 @@ import org.apache.qpid.util.BytesDataOut
public class AMQProtocolEngine implements ServerProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
- AMQVersionAwareProtocolSession,
- ConnectionMethodProcessor
+
ServerMethodProcessor<ServerChannelMethodProcessor>
{
private static final Logger _logger =
Logger.getLogger(AMQProtocolEngine.class);
@@ -114,9 +111,9 @@ public class AMQProtocolEngine implement
private VirtualHostImpl<?,?,?> _virtualHost;
private final Map<Integer, AMQChannel> _channelMap =
- new HashMap<Integer, AMQChannel>();
+ new HashMap<>();
private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners
=
- new CopyOnWriteArrayList<SessionModelListener>();
+ new CopyOnWriteArrayList<>();
private final AMQChannel[] _cachedChannels = new
AMQChannel[CHANNEL_CACHE_SIZE + 1];
@@ -128,7 +125,7 @@ public class AMQProtocolEngine implement
* Thread-safety: guarded by {@link #_receivedLock}.
*/
private final Set<AMQChannel> _channelsForCurrentMessage =
- new HashSet<AMQChannel>();
+ new HashSet<>();
private AMQDecoder _decoder;
@@ -142,14 +139,12 @@ public class AMQProtocolEngine implement
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion =
ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new
MethodRegistry(_protocolVersion);
- private final FrameCreatingMethodProcessor _methodProcessor = new
FrameCreatingMethodProcessor(_protocolVersion);
private final List<Action<? super AMQProtocolEngine>> _taskList =
- new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
+ new CopyOnWriteArrayList<>();
- private Map<Integer, Long> _closingChannelsList = new
ConcurrentHashMap<Integer, Long>();
+ private Map<Integer, Long> _closingChannelsList = new
ConcurrentHashMap<>();
private ProtocolOutputConverter _protocolOutputConverter;
private final Subject _authorizedSubject = new Subject();
- private MethodDispatcher _dispatcher;
private final long _connectionID;
private Object _reference = new Object();
@@ -183,6 +178,8 @@ public class AMQProtocolEngine implement
private boolean _authenticated;
private boolean _compressionSupported;
private int _messageCompressionThreshold;
+ private int _currentClassId;
+ private int _currentMethodId;
public AMQProtocolEngine(Broker broker,
final NetworkConnection network,
@@ -195,7 +192,7 @@ public class AMQProtocolEngine implement
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
- _decoder = new AMQDecoder(true, _methodProcessor);
+ _decoder = new BrokerDecoder(this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -306,32 +303,9 @@ public class AMQProtocolEngine implement
_readBytes += msg.remaining();
_receivedLock.lock();
- List<AMQDataBlock> processedMethods =
_methodProcessor.getProcessedMethods();
try
{
_decoder.decodeBuffer(msg);
- for (AMQDataBlock dataBlock : processedMethods)
- {
- try
- {
- dataBlockReceived(dataBlock);
- }
- catch(AMQConnectionException e)
- {
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Caught AMQConnectionException
but will simply stop processing data blocks - the connection should already be
closed.", e);
- }
- break;
- }
- catch (AMQException e)
- {
- _logger.error("Unexpected exception when
processing datablock", e);
- closeProtocolSession();
- break;
- }
- }
- processedMethods.clear();
receivedComplete();
}
catch (ConnectionScopedRuntimeException e)
@@ -361,7 +335,6 @@ public class AMQProtocolEngine implement
}
finally
{
- processedMethods.clear();
_receivedLock.unlock();
}
return null;
@@ -399,112 +372,10 @@ public class AMQProtocolEngine implement
}
}
- /**
- * Process the data block.
- * If the message is for a channel it is added to {@link
#_channelsForCurrentMessage}.
- *
- * @throws AMQConnectionException if unable to process the data block. In
this case,
- * the connection is already closed by the time the exception is thrown.
If any other
- * type of exception is thrown, the connection is not already closed.
- */
- private void dataBlockReceived(AMQDataBlock message) throws AMQException
- {
- if (message instanceof ProtocolInitiation)
- {
- protocolInitiationReceived((ProtocolInitiation) message);
-
- }
- else if (message instanceof AMQFrame)
- {
- AMQFrame frame = (AMQFrame) message;
- frameReceived(frame);
- }
- else
- {
- throw new AMQException("Unknown message type: " +
message.getClass().getName() + ": " + message);
- }
- }
-
- /**
- * Handle the supplied frame.
- * Adds this frame's channel to {@link #_channelsForCurrentMessage}.
- *
- * @throws AMQConnectionException if unable to process the data block. In
this case,
- * the connection is already closed by the time the exception is thrown.
If any other
- * type of exception is thrown, the connection is not already closed.
- */
- private void frameReceived(AMQFrame frame) throws AMQException
+ void channelRequiresSync(final AMQChannel amqChannel)
{
- int channelId = frame.getChannel();
- AMQChannel amqChannel = _channelMap.get(channelId);
- if(amqChannel != null)
- {
- // The _receivedLock is already acquired in the caller
- // It is safe to add channel
- _channelsForCurrentMessage.add(amqChannel);
- }
- else
- {
- // Not an error. The frame is probably a channel Open for this
channel id, which
- // does not require asynchronous work therefore its absence from
- // _channelsForCurrentMessage is ok.
- }
-
- AMQBody body = frame.getBodyFrame();
-
- long startTime = 0;
- String frameToString = null;
- if (_logger.isDebugEnabled())
- {
- startTime = System.currentTimeMillis();
- frameToString = frame.toString();
- _logger.debug("RECV: " + frame);
- }
-
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure
- processing close-ok");
- }
- }
- else
- {
- // The channel has been told to close, we don't process any
more frames until
- // it's closed.
- return;
- }
- }
-
- try
- {
- body.handle(channelId, this);
- }
- catch(AMQConnectionException e)
- {
- _logger.info(e.getMessage() + " whilst processing frame: " + body);
- closeConnection(channelId, e);
- throw e;
- }
- catch (AMQException e)
- {
- closeChannel(channelId, e.getErrorCode() == null ?
AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
- throw e;
- }
- catch (TransportException e)
- {
- closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
- throw e;
- }
-
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Frame handled in " + (System.currentTimeMillis() -
startTime) + " ms. Frame: " + frameToString);
- }
+ _channelsForCurrentMessage.add(amqChannel);
}
private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
@@ -623,148 +494,6 @@ public class AMQProtocolEngine implement
return buf;
}
- public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
- {
- final AMQMethodEvent<AMQMethodBody> evt = new
AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
-
- try
- {
- try
- {
- boolean wasAnyoneInterested = methodReceived(evt);
-
- if (!wasAnyoneInterested)
- {
- throw new AMQNoMethodHandlerException(evt);
- }
- }
- catch (AMQChannelException e)
- {
- if (getChannel(channelId) != null)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing channel due to: " +
e.getMessage());
- }
-
- AMQConstant errorType = e.getErrorCode();
- if(errorType == null)
- {
- errorType = AMQConstant.INTERNAL_ERROR;
- }
- writeFrame(new AMQFrame(channelId,
-
getMethodRegistry().createChannelCloseBody(errorType.getCode(),
-
AMQShortString.validValueOf(e.getMessage()),
-
e.getClassId(),
-
e.getMethodId())));
- closeChannel(channelId, errorType, e.getMessage());
- }
- else
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("ChannelException occurred on
non-existent channel:" + e.getMessage());
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing connection due to: " +
e.getMessage());
- }
-
- AMQConnectionException ce = new
AMQConnectionException(AMQConstant.CHANNEL_ERROR,
-
AMQConstant.CHANNEL_ERROR.getName().toString(),
-
methodBody, getMethodRegistry());
-
- _logger.info(e.getMessage() + " whilst processing:" +
methodBody);
- closeConnection(channelId, ce);
- }
- }
- catch (AMQConnectionException e)
- {
- _logger.info(e.getMessage() + " whilst processing:" +
methodBody);
- closeConnection(channelId, e);
- }
- }
- catch (Exception e)
- {
- _logger.error("Unexpected exception while processing frame.
Closing connection.", e);
-
- closeProtocolSession();
- }
- }
- private <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B>
evt) throws AMQException
- {
- final MethodDispatcher dispatcher = getMethodDispatcher();
-
- final int channelId = evt.getChannelId();
- final B body = evt.getMethod();
-
- final AMQChannel channel = getChannel(channelId);
- if(channelId != 0 && channel == null)
- {
-
- if(! ((body instanceof ChannelOpenBody)
- || (body instanceof ChannelCloseOkBody)
- || (body instanceof ChannelCloseBody)))
- {
- throw new AMQConnectionException(AMQConstant.CHANNEL_ERROR,
"channel is closed won't process:" + body, body, getMethodRegistry());
- }
-
- }
- if(channel == null)
- {
- return body.execute(dispatcher, channelId);
- }
- else
- {
- try
- {
- return Subject.doAs(channel.getSubject(), new
PrivilegedExceptionAction<Boolean>()
- {
- @Override
- public Boolean run() throws AMQException
- {
- return body.execute(dispatcher, channelId);
- }
- });
- }
- catch (PrivilegedActionException e)
- {
- if(e.getCause() instanceof AMQException)
- {
- throw (AMQException) e.getCause();
- }
- else
- {
- throw new ServerScopedRuntimeException(e.getCause());
- }
- }
-
-
- }
-
- }
-
- public void contentHeaderReceived(int channelId, ContentHeaderBody body)
throws AMQException
- {
-
- AMQChannel channel = getAndAssertChannel(channelId);
-
- channel.publishContentHeader(body);
-
- }
-
- public void contentBodyReceived(int channelId, ContentBody body) throws
AMQException
- {
- AMQChannel channel = getAndAssertChannel(channelId);
-
- channel.publishContentBody(body);
- }
-
- public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
- {
- // NO - OP
- }
/**
* Convenience method that writes a frame to the protocol session.
Equivalent to calling
@@ -808,19 +537,8 @@ public class AMQProtocolEngine implement
{
synchronized (_channelMap)
{
- return new ArrayList<AMQChannel>(_channelMap.values());
- }
- }
-
- public AMQChannel getAndAssertChannel(int channelId) throws AMQException
- {
- AMQChannel channel = getChannel(channelId);
- if (channel == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found
with id:" + channelId);
+ return new ArrayList<>(_channelMap.values());
}
-
- return channel;
}
public AMQChannel getChannel(int channelId)
@@ -899,8 +617,8 @@ public class AMQProtocolEngine implement
writeFrame(new AMQFrame(channel.getChannelId(),
getMethodRegistry().createChannelCloseBody(cause.getCode(),
AMQShortString.validValueOf(message),
-
_methodProcessor.getClassId(),
-
_methodProcessor.getMethodId())));
+
_currentClassId,
+
_currentMethodId)));
closeChannel(channel, cause, message, true);
}
@@ -1106,7 +824,7 @@ public class AMQProtocolEngine implement
{
_logger.info("Closing connection due to: " + message);
}
- closeConnection(channelId, new AMQFrame(0, new
ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(),
AMQShortString.validValueOf(message), _methodProcessor.getClassId(),
_methodProcessor.getMethodId())));
+ closeConnection(channelId, new AMQFrame(0, new
ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(),
AMQShortString.validValueOf(message), _currentClassId, _currentMethodId)));
}
private void closeConnection(int channelId, AMQFrame frame)
@@ -1224,9 +942,7 @@ public class AMQProtocolEngine implement
{
_protocolVersion = pv;
_methodRegistry.setProtocolVersion(_protocolVersion);
- _methodProcessor.setProtocolVersion(_protocolVersion);
_protocolOutputConverter = new ProtocolOutputConverterImpl(this);
- _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this);
}
public byte getProtocolMajorVersion()
@@ -1335,11 +1051,6 @@ public class AMQProtocolEngine implement
return _methodRegistry;
}
- public MethodDispatcher getMethodDispatcher()
- {
- return _dispatcher;
- }
-
public void closed()
{
try
@@ -1353,14 +1064,10 @@ public class AMQProtocolEngine implement
closeProtocolSession();
}
}
- catch (ConnectionScopedRuntimeException e)
+ catch (ConnectionScopedRuntimeException | TransportException e)
{
_logger.error("Could not close protocol engine", e);
}
- catch (TransportException e)
- {
- _logger.error("Could not close protocol engine", e);
- }
}
public void readerIdle()
@@ -1427,11 +1134,6 @@ public class AMQProtocolEngine implement
}
}
- public void setSender(Sender<ByteBuffer> sender)
- {
- // Do nothing
- }
-
public long getReadBytes()
{
return _readBytes;
@@ -1572,7 +1274,7 @@ public class AMQProtocolEngine implement
public List<AMQChannel> getSessionModels()
{
- return new ArrayList<AMQChannel>(getChannels());
+ return new ArrayList<>(getChannels());
}
public LogSubject getLogSubject()
@@ -2074,4 +1776,52 @@ public class AMQProtocolEngine implement
return _broker.getEventLogger();
}
}
+
+ @Override
+ public ServerChannelMethodProcessor getChannelMethodProcessor(final int
channelId)
+ {
+ ServerChannelMethodProcessor channelMethodProcessor =
getChannel(channelId);
+ if(channelMethodProcessor == null)
+ {
+ channelMethodProcessor = (ServerChannelMethodProcessor)
Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(),
+ new Class[] {
ServerChannelMethodProcessor.class }, new InvocationHandler()
+ {
+ @Override
+ public Object invoke(final Object proxy, final Method
method, final Object[] args)
+ throws Throwable
+ {
+ closeConnection(AMQConstant.CHANNEL_ERROR,
"Unknown channel id: " + channelId, channelId);
+
+ return null;
+ }
+ });
+ }
+ return channelMethodProcessor;
+ }
+
+ @Override
+ public void receiveHeartbeat()
+ {
+ // No op
+ }
+
+ @Override
+ public void receiveProtocolHeader(final ProtocolInitiation
protocolInitiation)
+ {
+ protocolInitiationReceived(protocolInitiation);
+ }
+
+ @Override
+ public void setCurrentMethod(final int classId, final int methodId)
+ {
+ _currentClassId = classId;
+ _currentMethodId = methodId;
+ }
+
+ @Override
+ public boolean ignoreAllButCloseOk()
+ {
+ return _closing.get();
+ }
+
}
Added:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java?rev=1631275&view=auto
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
(added)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
Mon Oct 13 00:58:45 2014
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.io.IOException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.codec.ServerDecoder;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
+public class BrokerDecoder extends ServerDecoder
+{
+ private final AMQProtocolEngine _connection;
+ /**
+ * Creates a new AMQP decoder.
+ *
+ * @param connection
+ */
+ public BrokerDecoder(final AMQProtocolEngine connection)
+ {
+ super(connection);
+ _connection = connection;
+ }
+
+ @Override
+ protected void processFrame(final int channelId, final byte type, final
long bodySize, final MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ Subject subject;
+ AMQChannel channel = _connection.getChannel(channelId);
+ if(channel == null)
+ {
+ subject = _connection.getSubject();
+ }
+ else
+ {
+ _connection.channelRequiresSync(channel);
+
+ subject = channel.getSubject();
+ }
+ try
+ {
+ Subject.doAs(subject, new PrivilegedExceptionAction<Object>()
+ {
+ @Override
+ public Void run() throws IOException, AMQFrameDecodingException
+ {
+ doProcessFrame(channelId, type, bodySize, in);
+ return null;
+ }
+ });
+ }
+ catch (PrivilegedActionException e)
+ {
+ Throwable cause = e.getCause();
+ if(cause instanceof IOException)
+ {
+ throw (IOException) cause;
+ }
+ else if(cause instanceof AMQFrameDecodingException)
+ {
+ throw (AMQFrameDecodingException) cause;
+ }
+ else if(cause instanceof RuntimeException)
+ {
+ throw (RuntimeException) cause;
+ }
+ else throw new ServerScopedRuntimeException(cause);
+ }
+
+ }
+
+
+ private void doProcessFrame(final int channelId, final byte type, final
long bodySize, final MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ super.processFrame(channelId, type, bodySize, in);
+
+ }
+
+}
Propchange:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
Mon Oct 13 00:58:45 2014
@@ -20,16 +20,15 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.MessageDestination;
-import java.util.ArrayList;
-import java.util.List;
-
public class IncomingMessage
{
@@ -58,7 +57,7 @@ public class IncomingMessage
return _messagePublishInfo;
}
- public void addContentBodyFrame(final ContentBody contentChunk) throws
AMQException
+ public void addContentBodyFrame(final ContentBody contentChunk)
{
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
@@ -94,7 +93,7 @@ public class IncomingMessage
_messageDestination = e;
}
- public int getBodyCount() throws AMQException
+ public int getBodyCount()
{
return _contentChunks.size();
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Mon Oct 13 00:58:45 2014
@@ -48,6 +48,7 @@ import org.apache.qpid.client.state.AMQS
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.ClientDecoder;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
@@ -193,7 +194,7 @@ public class AMQProtocolHandler implemen
_connection = con;
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
- _decoder = new AMQDecoder(false,
_protocolSession.getMethodProcessor());
+ _decoder = new ClientDecoder(_protocolSession.getMethodProcessor());
_failoverHandler = new FailoverHandler(this);
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
Mon Oct 13 00:58:45 2014
@@ -30,14 +30,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
-import org.apache.qpid.framing.AMQDataBlockDecoder;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ByteArrayDataInput;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.MethodProcessor;
-import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
/**
* AMQDecoder delegates the decoding of AMQP either to a data block decoder,
or in the case of new connections, to a
@@ -51,12 +45,9 @@ import org.apache.qpid.framing.ProtocolI
* TODO If protocol initiation decoder not needed, then don't create it.
Probably not a big deal, but it adds to the
* per-session overhead.
*/
-public class AMQDecoder
+public abstract class AMQDecoder<T extends MethodProcessor>
{
- private final MethodProcessor _methodProcessor;
-
- /** Holds the 'normal' AMQP data decoder. */
- private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+ private final T _methodProcessor;
/** Holds the protocol initiation decoder. */
private ProtocolInitiation.Decoder _piDecoder = new
ProtocolInitiation.Decoder();
@@ -67,6 +58,8 @@ public class AMQDecoder
private boolean _firstRead = true;
+ private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
+
private List<ByteArrayInputStream> _remainingBufs = new
ArrayList<ByteArrayInputStream>();
/**
@@ -75,7 +68,7 @@ public class AMQDecoder
* @param expectProtocolInitiation <tt>true</tt> if this decoder needs to
handle protocol initiation.
* @param methodProcessor method processor
*/
- public AMQDecoder(boolean expectProtocolInitiation, MethodProcessor
methodProcessor)
+ protected AMQDecoder(boolean expectProtocolInitiation, T methodProcessor)
{
_expectProtocolInitiation = expectProtocolInitiation;
_methodProcessor = methodProcessor;
@@ -96,7 +89,12 @@ public class AMQDecoder
public void setMaxFrameSize(final int frameMax)
{
- _dataBlockDecoder.setMaxFrameSize(frameMax);
+ _maxFrameSize = frameMax;
+ }
+
+ public T getMethodProcessor()
+ {
+ return _methodProcessor;
}
private class RemainingByteArrayInputStream extends InputStream
@@ -254,10 +252,10 @@ public class AMQDecoder
{
if(!_expectProtocolInitiation)
{
- enoughData = _dataBlockDecoder.decodable(msg);
+ enoughData = decodable(msg);
if (enoughData)
{
- _dataBlockDecoder.processInput(_methodProcessor, msg);
+ processInput(msg);
}
}
else
@@ -303,4 +301,105 @@ public class AMQDecoder
}
}
}
+
+ private boolean decodable(final MarkableDataInput in) throws
AMQFrameDecodingException, IOException
+ {
+ final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
+ // type, channel, body length and end byte
+ if (remainingAfterAttributes < 0)
+ {
+ return false;
+ }
+
+ in.mark(8);
+ in.skip(1 + 2);
+
+
+ // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
+ final long bodySize = in.readInt() & 0xffffffffL;
+ if (bodySize > _maxFrameSize)
+ {
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "Incoming frame size of "
+ + bodySize
+ + " is larger than negotiated
maximum of "
+ + _maxFrameSize);
+ }
+ in.reset();
+
+ return (remainingAfterAttributes >= bodySize);
+
+ }
+
+ private void processInput(final MarkableDataInput in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException,
IOException
+ {
+ final byte type = in.readByte();
+
+ final int channel = in.readUnsignedShort();
+ final long bodySize = EncodingUtils.readUnsignedInteger(in);
+
+ // bodySize can be zero
+ if ((channel < 0) || (bodySize < 0))
+ {
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "Undecodable frame: type = " +
type + " channel = " + channel
+ + " bodySize = " + bodySize);
+ }
+
+ processFrame(channel, type, bodySize, in);
+
+ byte marker = in.readByte();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "End of frame marker not
found. Read " + marker + " length=" + bodySize
+ + " type=" + type);
+ }
+
+ }
+
+ protected void processFrame(final int channel, final byte type, final long
bodySize, final MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ switch (type)
+ {
+ case 1:
+ processMethod(channel, in);
+ break;
+ case 2:
+ ContentHeaderBody.process(in,
_methodProcessor.getChannelMethodProcessor(channel), bodySize);
+ break;
+ case 3:
+ ContentBody.process(in,
_methodProcessor.getChannelMethodProcessor(channel), bodySize);
+ break;
+ case 8:
+ HeartbeatBody.process(channel, in, _methodProcessor, bodySize);
+ break;
+ default:
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
"Unsupported frame type: " + type);
+ }
+ }
+
+
+ abstract void processMethod(int channelId,
+ MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException;
+
+ AMQFrameDecodingException newUnknownMethodException(final int classId,
+ final int methodId,
+ ProtocolVersion
protocolVersion)
+ {
+ return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID,
+ "Method "
+ + methodId
+ + " unknown in AMQP version "
+ + protocolVersion
+ + " (while trying to decode class
"
+ + classId
+ + " method "
+ + methodId
+ + ".");
+ }
+
}
Added:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java?rev=1631275&view=auto
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
(added)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
Mon Oct 13 00:58:45 2014
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import java.io.IOException;
+
+import org.apache.qpid.framing.*;
+
+public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends
ClientChannelMethodProcessor>>
+{
+
+ /**
+ * Creates a new AMQP decoder.
+ *
+ * @param methodProcessor method processor
+ */
+ public ClientDecoder(final ClientMethodProcessor<? extends
ClientChannelMethodProcessor> methodProcessor)
+ {
+ super(false, methodProcessor);
+ }
+
+
+ void processMethod(int channelId,
+ MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ ClientMethodProcessor<? extends ClientChannelMethodProcessor>
methodProcessor = getMethodProcessor();
+ ClientChannelMethodProcessor channelMethodProcessor =
methodProcessor.getChannelMethodProcessor(channelId);
+ final int classAndMethod = in.readInt();
+ int classId = classAndMethod >> 16;
+ int methodId = classAndMethod & 0xFFFF;
+ methodProcessor.setCurrentMethod(classId, methodId);
+ try
+ {
+ switch (classAndMethod)
+ {
+ //CONNECTION_CLASS:
+ case 0x000a000a:
+ ConnectionStartBody.process(in, methodProcessor);
+ break;
+ case 0x000a0014:
+ ConnectionSecureBody.process(in, methodProcessor);
+ break;
+ case 0x000a001e:
+ ConnectionTuneBody.process(in, methodProcessor);
+ break;
+ case 0x000a0029:
+ ConnectionOpenOkBody.process(in, methodProcessor);
+ break;
+ case 0x000a002a:
+ ConnectionRedirectBody.process(in, methodProcessor);
+ break;
+ case 0x000a0032:
+ if
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ ConnectionRedirectBody.process(in, methodProcessor);
+ }
+ else
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ break;
+ case 0x000a0033:
+ if
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ throw newUnknownMethodException(classId, methodId,
+
methodProcessor.getProtocolVersion());
+ }
+ else
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ break;
+ case 0x000a003c:
+ if
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+
methodProcessor.getProtocolVersion());
+ }
+ break;
+ case 0x000a003d:
+ if
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+
methodProcessor.getProtocolVersion());
+ }
+ break;
+
+ // CHANNEL_CLASS:
+
+ case 0x0014000b:
+ ChannelOpenOkBody.process(in,
methodProcessor.getProtocolVersion(), channelMethodProcessor);
+ break;
+ case 0x00140014:
+ ChannelFlowBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140015:
+ ChannelFlowOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x0014001e:
+ ChannelAlertBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140028:
+ ChannelCloseBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140029:
+ channelMethodProcessor.receiveChannelCloseOk();
+ break;
+
+ // ACCESS_CLASS:
+
+ case 0x001e000b:
+ AccessRequestOkBody.process(in, channelMethodProcessor);
+ break;
+
+ // EXCHANGE_CLASS:
+
+ case 0x0028000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveExchangeDeclareOk();
+ }
+ break;
+ case 0x00280015:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveExchangeDeleteOk();
+ }
+ break;
+ case 0x00280017:
+ ExchangeBoundOkBody.process(in, channelMethodProcessor);
+ break;
+
+
+ // QUEUE_CLASS:
+
+ case 0x0032000b:
+ QueueDeclareOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320015:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveQueueBindOk();
+ }
+ break;
+ case 0x0032001f:
+ QueuePurgeOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320029:
+ QueueDeleteOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320033:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveQueueUnbindOk();
+ }
+ break;
+
+
+ // BASIC_CLASS:
+
+ case 0x003c000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveBasicQosOk();
+ }
+ break;
+ case 0x003c0015:
+ BasicConsumeOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c001f:
+ BasicCancelOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0032:
+ BasicReturnBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c003c:
+ BasicDeliverBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0047:
+ BasicGetOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0048:
+ BasicGetEmptyBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0065:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveBasicRecoverSyncOk();
+ }
+ break;
+ case 0x003c006f:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveBasicRecoverSyncOk();
+ }
+ break;
+
+ // TX_CLASS:
+
+ case 0x005a000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxSelectOk();
+ }
+ break;
+ case 0x005a0015:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxCommitOk();
+ }
+ break;
+ case 0x005a001f:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxRollbackOk();
+ }
+ break;
+
+ default:
+ throw newUnknownMethodException(classId, methodId,
+
methodProcessor.getProtocolVersion());
+
+ }
+ }
+ finally
+ {
+ methodProcessor.setCurrentMethod(0, 0);
+ }
+ }
+
+}
Propchange:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1631275&view=auto
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
(added)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
Mon Oct 13 00:58:45 2014
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import java.io.IOException;
+
+import org.apache.qpid.framing.*;
+
+public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends
ServerChannelMethodProcessor>>
+{
+
+ /**
+ * Creates a new AMQP decoder.
+ *
+ * @param methodProcessor method processor
+ */
+ public ServerDecoder(final ServerMethodProcessor<? extends
ServerChannelMethodProcessor> methodProcessor)
+ {
+ super(true, methodProcessor);
+ }
+
+ void processMethod(int channelId,
+ MarkableDataInput in)
+ throws AMQFrameDecodingException, IOException
+ {
+ ServerMethodProcessor<? extends ServerChannelMethodProcessor>
methodProcessor = getMethodProcessor();
+ ServerChannelMethodProcessor channelMethodProcessor =
methodProcessor.getChannelMethodProcessor(channelId);
+ final int classAndMethod = in.readInt();
+ int classId = classAndMethod >> 16;
+ int methodId = classAndMethod & 0xFFFF;
+ methodProcessor.setCurrentMethod(classId, methodId);
+ try
+ {
+ switch (classAndMethod)
+ {
+ //CONNECTION_CLASS:
+ case 0x000a000b:
+ ConnectionStartOkBody.process(in, methodProcessor);
+ break;
+ case 0x000a0015:
+ ConnectionSecureOkBody.process(in, methodProcessor);
+ break;
+ case 0x000a001f:
+ ConnectionTuneOkBody.process(in, methodProcessor);
+ break;
+ case 0x000a0028:
+ ConnectionOpenBody.process(in, methodProcessor);
+ break;
+ case 0x000a0032:
+ if
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ throw newUnknownMethodException(classId, methodId,
+
methodProcessor.getProtocolVersion());
+ }
+ else
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ break;
+ case 0x000a0033:
+ if
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ throw newUnknownMethodException(classId, methodId,
+
methodProcessor.getProtocolVersion());
+ }
+ else
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ break;
+ case 0x000a003c:
+ if
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+
methodProcessor.getProtocolVersion());
+ }
+ break;
+ case 0x000a003d:
+ if
(methodProcessor.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+
methodProcessor.getProtocolVersion());
+ }
+ break;
+
+ // CHANNEL_CLASS:
+
+ case 0x0014000a:
+ ChannelOpenBody.process(channelId, in, methodProcessor);
+ break;
+ case 0x00140014:
+ ChannelFlowBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140015:
+ ChannelFlowOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140028:
+ ChannelCloseBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140029:
+ channelMethodProcessor.receiveChannelCloseOk();
+ break;
+
+ // ACCESS_CLASS:
+
+ case 0x001e000a:
+ AccessRequestBody.process(in, channelMethodProcessor);
+ break;
+
+ // EXCHANGE_CLASS:
+
+ case 0x0028000a:
+ ExchangeDeclareBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00280014:
+ ExchangeDeleteBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00280016:
+ ExchangeBoundBody.process(in, channelMethodProcessor);
+ break;
+
+
+ // QUEUE_CLASS:
+
+ case 0x0032000a:
+ QueueDeclareBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320014:
+ QueueBindBody.process(in, channelMethodProcessor);
+ break;
+ case 0x0032001e:
+ QueuePurgeBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320028:
+ QueueDeleteBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320032:
+ QueueUnbindBody.process(in, channelMethodProcessor);
+ break;
+
+
+ // BASIC_CLASS:
+
+ case 0x003c000a:
+ BasicQosBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0014:
+ BasicConsumeBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c001e:
+ BasicCancelBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0028:
+ BasicPublishBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0046:
+ BasicGetBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0050:
+ BasicAckBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c005a:
+ BasicRejectBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0064:
+ BasicRecoverBody.process(in,
methodProcessor.getProtocolVersion(), channelMethodProcessor);
+ break;
+ case 0x003c0066:
+ BasicRecoverSyncBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c006e:
+ BasicRecoverSyncBody.process(in, channelMethodProcessor);
+ break;
+
+ // TX_CLASS:
+
+ case 0x005a000a:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxSelect();
+ }
+ break;
+ case 0x005a0014:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxCommit();
+ }
+ break;
+ case 0x005a001e:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxRollback();
+ }
+ break;
+
+ default:
+ throw newUnknownMethodException(classId, methodId,
+
methodProcessor.getProtocolVersion());
+
+ }
+ }
+ finally
+ {
+ methodProcessor.setCurrentMethod(0, 0);
+ }
+ }
+
+}
Propchange:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
Mon Oct 13 00:58:45 2014
@@ -165,9 +165,8 @@ public class AccessRequestBody extends A
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws
IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
throws IOException
{
AMQShortString realm = buffer.readAMQShortString();
byte bitfield = buffer.readByte();
@@ -176,6 +175,9 @@ public class AccessRequestBody extends A
boolean active = (bitfield & 0x04) == 0x4 ;
boolean write = (bitfield & 0x08) == 0x8 ;
boolean read = (bitfield & 0x10) == 0x10 ;
- dispatcher.receiveAccessRequest(channelId, realm, exclusive, passive,
active, write, read);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveAccessRequest(realm, exclusive, passive, active,
write, read);
+ }
}
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
Mon Oct 13 00:58:45 2014
@@ -95,10 +95,14 @@ public class AccessRequestOkBody extends
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput
buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher)
throws IOException
{
int ticket = buffer.readUnsignedShort();
- dispatcher.receiveAccessRequestOk(channelId, ticket);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveAccessRequestOk(ticket);
+ }
}
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
Mon Oct 13 00:58:45 2014
@@ -112,13 +112,15 @@ public class BasicAckBody extends AMQMet
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws
IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
throws IOException
{
long deliveryTag = buffer.readLong();
boolean multiple = (buffer.readByte() & 0x01) != 0;
- dispatcher.receiveBasicAck(channelId, deliveryTag, multiple);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicAck(deliveryTag, multiple);
+ }
}
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
Mon Oct 13 00:58:45 2014
@@ -113,13 +113,15 @@ public class BasicCancelBody extends AMQ
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws
IOException
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
boolean noWait = (buffer.readByte() & 0x01) == 0x01;
- dispatcher.receiveBasicCancel(channelId, consumerTag, noWait);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicCancel(consumerTag, noWait);
+ }
}
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
Mon Oct 13 00:58:45 2014
@@ -96,10 +96,14 @@ public class BasicCancelOkBody extends A
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput
in, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput in,
+ final ClientChannelMethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = in.readAMQShortString();
- dispatcher.receiveBasicCancelOk(channelId, consumerTag);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicCancelOk(consumerTag);
+ }
}
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
Mon Oct 13 00:58:45 2014
@@ -191,7 +191,8 @@ public class BasicConsumeBody extends AM
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput
buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
@@ -205,6 +206,9 @@ public class BasicConsumeBody extends AM
boolean exclusive = (bitfield & 0x04) == 0x04;
boolean nowait = (bitfield & 0x08) == 0x08;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- dispatcher.receiveBasicConsume(channelId, queue, consumerTag, noLocal,
noAck, exclusive, nowait, arguments);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicConsume(queue, consumerTag, noLocal, noAck,
exclusive, nowait, arguments);
+ }
}
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
Mon Oct 13 00:58:45 2014
@@ -96,10 +96,14 @@ public class BasicConsumeOkBody extends
return buf.toString();
}
- public static void process(final int channelId, final MarkableDataInput
buffer, final MethodProcessor dispatcher)
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
- dispatcher.receiveBasicConsumeOk(channelId, consumerTag);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicConsumeOk(consumerTag);
+ }
}
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
Mon Oct 13 00:58:45 2014
@@ -152,9 +152,8 @@ public class BasicDeliverBody extends AM
return buf.toString();
}
- public static void process(final int channelId,
- final MarkableDataInput buffer,
- final MethodProcessor dispatcher) throws
IOException
+ public static void process(final MarkableDataInput buffer,
+ final ClientChannelMethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
@@ -162,6 +161,9 @@ public class BasicDeliverBody extends AM
boolean redelivered = (buffer.readByte() & 0x01) != 0;
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
- dispatcher.receiveBasicDeliver(channelId, consumerTag, deliveryTag,
redelivered, exchange, routingKey);
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicDeliver(consumerTag, deliveryTag,
redelivered, exchange, routingKey);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]