Rob Godfrey created QPID-6735:
---------------------------------
Summary: [Java Broker] Out of memory wehn consuming messages in
very large transaction
Key: QPID-6735
URL: https://issues.apache.org/jira/browse/QPID-6735
Project: Qpid
Issue Type: Bug
Components: Java Broker
Reporter: Rob Godfrey
Assignee: Rob Godfrey
The flow to disk mechanism for the Java Broker is not sufficient for dealing
with the case where large (or large numbers of) flowed to disk messages are
being sent from the broker to the client in a transaction. When sending the
message it is retrieved from disk and memory allocated for this... however
unless the housekeeping thread evicts it again it stays in memory until the
transaction is committed.
The following simple test program
{code:java}
public class Test
{
public static void main(String[] args) throws Exception
{
System.setProperty("qpid.amqp.version","0-9-1");
AMQConnection conn = new AMQConnection("127.0.0.1","admin","admin",
"foo","test");
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue");
final MessageProducer prod = session.createProducer(queue);
for(int i = 0; i < 20000; i++)
{
final BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(new byte[1024*1024]);
prod.send(bytesMessage);
}
Session session2 = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cons = session2.createConsumer(queue);
while(cons.receive(1000) != null);
session2.commit();
conn.close();
}
}
{code}
with the following exception
{noformat}
########################################################################
#
# Unhandled Exception java.lang.OutOfMemoryError: Direct buffer memory in
Thread virtualhost-test-iopool-3
#
# Exiting
#
########################################################################
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at
org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirect(QpidByteBuffer.java:464)
at
org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirectCollection(QpidByteBuffer.java:514)
at
org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore.getAllContent(AbstractBDBMessageStore.java:413)
at
org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore$StoredBDBMessage.getContentAsByteBuffer(AbstractBDBMessageStore.java:1100)
at
org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore$StoredBDBMessage.getContent(AbstractBDBMessageStore.java:1126)
at
org.apache.qpid.server.message.AbstractServerMessageImpl.getContent(AbstractServerMessageImpl.java:172)
at
org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl$MessageContentSourceBody.writePayload(ProtocolOutputConverterImpl.java:292)
at org.apache.qpid.framing.AMQFrame.writePayload(AMQFrame.java:81)
at
org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl$CompositeAMQBodyBlock.writePayload(ProtocolOutputConverterImpl.java:513)
at
org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.writeFrame(AMQPConnection_0_8.java:434)
at
org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeFrame(ProtocolOutputConverterImpl.java:465)
at
org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDeliveryUnchanged(ProtocolOutputConverterImpl.java:224)
at
org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:148)
at
org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:101)
at
org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeDeliver(ProtocolOutputConverterImpl.java:78)
at
org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8$WriteDeliverMethod.deliverToClient(AMQPConnection_0_8.java:1421)
at
org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8.sendToClient(ConsumerTarget_0_8.java:489)
at
org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8$AckConsumer.doSend(ConsumerTarget_0_8.java:293)
at
org.apache.qpid.server.consumer.AbstractConsumerTarget.sendNextMessage(AbstractConsumerTarget.java:211)
at
org.apache.qpid.server.consumer.AbstractConsumerTarget.processPending(AbstractConsumerTarget.java:63)
at
org.apache.qpid.server.protocol.v0_8.AMQChannel.processPending(AMQChannel.java:3757)
at
org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.processPending(AMQPConnection_0_8.java:1549)
at
org.apache.qpid.server.transport.MultiVersionProtocolEngine.processPending(MultiVersionProtocolEngine.java:197)
at
org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:220)
at
org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:121)
at
org.apache.qpid.server.transport.NetworkConnectionScheduler.access$000(NetworkConnectionScheduler.java:37)
at
org.apache.qpid.server.transport.NetworkConnectionScheduler$2.run(NetworkConnectionScheduler.java:102)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
{noformat}
with the broker VM memory settings as follows
{noformat}
-Dvirtualhost.connectionThreadPool.maximum=20
-Dvirtualhost.connectionThreadPool.minimum=16 -verbose:gc -Xmx2048m -Xms1024m
-XX:MaxDirectMemorySize=3072m
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]