Author: rhs Date: Wed Feb 11 19:16:48 2009 New Revision: 743455 URL: http://svn.apache.org/viewvc?rev=743455&view=rev Log: QPID-1658: added a byte limit for the number of commands in the session replay buffer, and made the buffer length configurable
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java?rev=743455&r1=743454&r2=743455&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java Wed Feb 11 19:16:48 2009 @@ -112,6 +112,19 @@ throw new UnsupportedOperationException(); } + public int getBodySize() + { + ByteBuffer body = getBody(); + if (body == null) + { + return 0; + } + else + { + return body.remaining(); + } + } + public abstract byte getEncodedTrack(); public abstract <C> void dispatch(C context, MethodDelegate<C> delegate); Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=743455&r1=743454&r2=743455&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Wed Feb 11 19:16:48 2009 @@ -95,7 +95,9 @@ // outgoing command count private int commandsOut = 0; - private Method[] commands = new Method[64*1024]; + private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)]; + private int commandBytes = 0; + private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024); private int maxComplete = commandsOut - 1; private boolean needSync = false; @@ -432,7 +434,13 @@ int old = maxComplete; for (int id = max(maxComplete, lower); le(id, upper); id++) { - commands[mod(id, commands.length)] = null; + int idx = mod(id, commands.length); + Method m = commands[idx]; + if (m != null) + { + commandBytes -= m.getBodySize(); + } + commands[idx] = null; } if (le(lower, maxComplete + 1)) { @@ -462,7 +470,7 @@ final private boolean isFull(int id) { - return id - maxComplete >= commands.length; + return id - maxComplete >= commands.length || commandBytes >= byteLimit; } public void invoke(Method m) @@ -542,6 +550,7 @@ if (expiry > 0) { commands[mod(next, commands.length)] = m; + commandBytes += m.getBodySize(); } if (autoSync) { --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org