Common Network Layer InterfacePage edited by Aidan SkinnerPurposeThis design page describes the low level design for the new interface which is aimed at facilitating encapsulation for the Network code in both the Java Broker & Client. This is the first step in decoupling the exsiting IO layer from both the surrounding Qpid code and more specifically from the current tie-in to MINA. This document will provide sufficient information for architecture review and also for input to task breakdown & planning. Interface Requirements
Design DetailsCurrent implementationInside Qpid, data is read from a socket and placed in a buffer. A separate thread then takes this buffer and attempts to parse it as an AMQP command. this AMQP command is then put on a second buffer. Finally a third thread reads the command and processes it. Currently the two buffers between these three threads are unbounded. This means that data is read from the network as fast as possible with no regard as to whether the broker has the capacity to process it. Queues are themselves a kind of buffer between client applications. From a queue the message can be assigned to be send to a client. At this point a delivery command is placed in another buffer awaiting sending on the network. When received by the client a similar process to receiving on the broker occurs The whole process looks something like this Client App sends message -> (MINA Buffer) Or, pictorally: Of all the buffers above, only the TCP buffers are bounded (the Delivery Queue Buffer in the client is potentially bounded by prefetch, although prefetch is not set on bytes but on messages which may be of arbitrary size), every other buffer is a potential source of out of memory exceptions. From the above we can see that there are many potential sources of OutOfMemoryExceptions. We need to consider where we may get unbounded growth, what scenarios will cause that, and what other ways we have to mitigate those risks. In general we get growth of the IO (MINA) buffers when sender and receiver are operating at mismatched rates (i.e. the Client and Broker). We will get unbounded growth of the queue if the sending client is producing at a faster rate than the receiving client can process. Issues
Current implementationThe brokers network layer is implemented in the following way: The client opens a connection like this: and further processing looks like this: Data FlowAt the start of a connection the the NetworkDriver will pass data to a ProtocolEngine which will handle protocol negotiation and return the ProtocolEngine to use or throw an exception if no driver is available. The implementation will use the existing Sender and Reciever interfaces in org.apache.qpid.transport which will allow the use of the existing alternate transport layer implementations. The thread of control remains with the network driver. Data comes in from the operating system, is read from the socket by the NetworkDriver and given to the ProtocolEngines received method. The ProtocolEngine is responsible for processing the bytes and interfacing to the rest of the broker or client. The ProtocolEngine will write bytes to the wire using the NetworkDriver which implements the Sender interface. Sender:
public interface Sender<T>
{
void setIdleTimeout(long l);
void send(T msg);
void flush();
void close();
}
The ProtocolEngine will implement the Reciever interface to be given bytes by the NetworkDriver. Receiever:
public interface Receiver<T>
{
void received(T msg);
void exception(Throwable t);
void closed();
}
The ProtocolEngine will implement the following interface:
public interface ProtocolEngine implements Receiver
{
void setSender(Sender sender)
void SocketAddress getRemoteAddress()
long getWrittenBytes()
long getReadBytes()
long close()
}
The NetworkDriver will implement the following interface:
public interface NetworkDriver implements Sender
{
void setNetworkDriverConfiguration(NetworkDriverConfiguration config)
void open(InetAddress destination);
void bind(int port, InetAddress[] addresses);
}
The NetworkConfiguration interface provides configuration data for the NetworkDriver:
public interface NetworkDriverConfiguration
{
// Taken from Socket
void getKeepAlive(boolean on)
void getOOBInline(boolean on)
void getReceiveBufferSize(int size)
void getReuseAddress(boolean on)
void getSendBufferSize(int size)
void getSoLinger(boolean on, int linger)
void getSoTimeout(int timeout)
void getTcpNoDelay(boolean on)
void getTrafficClass(int tc)
void getReceiveBufferSize(int size);
void getSendBufferSize(int size);
}
Change Notification Preferences
View Online
|
View Change
|
Add Comment
|
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
- [CONF] Apache Qpid > Common Network Layer Interface confluence
