[ 
https://issues.apache.org/jira/browse/QPID-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keith Wall resolved QPID-6735.
------------------------------
       Resolution: Fixed
    Fix Version/s: qpid-java-6.0

> [Java Broker] Out of memory when consuming messages in very large transaction
> -----------------------------------------------------------------------------
>
>                 Key: QPID-6735
>                 URL: https://issues.apache.org/jira/browse/QPID-6735
>             Project: Qpid
>          Issue Type: Bug
>          Components: Java Broker
>            Reporter: Rob Godfrey
>            Assignee: Keith Wall
>             Fix For: qpid-java-6.0
>
>
> The flow to disk mechanism for the Java Broker is not sufficient for dealing 
> with the case where large (or large numbers of) flowed to disk messages are 
> being sent from the broker to the client in a transaction.  When sending the 
> message it is retrieved from disk and memory allocated for this... however 
> unless the housekeeping thread evicts it again it stays in memory until the 
> transaction is committed.
> The following simple test program 
> {code:java}
> public class Test
> {
>     public static void main(String[] args) throws Exception
>     {
>         System.setProperty("qpid.amqp.version","0-9-1");
>         AMQConnection conn = new AMQConnection("127.0.0.1","admin","admin", 
> "foo","test");
>         conn.start();
>         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
>         Queue queue = session.createQueue("queue");
>         final MessageProducer prod = session.createProducer(queue);
>         for(int i = 0; i < 20000; i++)
>         {
>             final BytesMessage bytesMessage = session.createBytesMessage();
>             bytesMessage.writeBytes(new byte[1024*1024]);
>             prod.send(bytesMessage);
>         }
>         Session session2 = conn.createSession(true, 
> Session.SESSION_TRANSACTED);
>         MessageConsumer cons = session2.createConsumer(queue);
>         while(cons.receive(1000) != null);
>         session2.commit();
>         conn.close();
>     }
> }
> {code}
> with the following exception
> {noformat}
> ########################################################################
> #
> # Unhandled Exception java.lang.OutOfMemoryError: Direct buffer memory in 
> Thread virtualhost-test-iopool-3
> #
> # Exiting
> #
> ########################################################################
> java.lang.OutOfMemoryError: Direct buffer memory
>       at java.nio.Bits.reserveMemory(Bits.java:658)
>       at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>       at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>       at 
> org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirect(QpidByteBuffer.java:464)
>       at 
> org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirectCollection(QpidByteBuffer.java:514)
>       at 
> org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore.getAllContent(AbstractBDBMessageStore.java:413)
>       at 
> org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore$StoredBDBMessage.getContentAsByteBuffer(AbstractBDBMessageStore.java:1100)
>       at 
> org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore$StoredBDBMessage.getContent(AbstractBDBMessageStore.java:1126)
>       at 
> org.apache.qpid.server.message.AbstractServerMessageImpl.getContent(AbstractServerMessageImpl.java:172)
>       at 
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl$MessageContentSourceBody.writePayload(ProtocolOutputConverterImpl.java:292)
>       at org.apache.qpid.framing.AMQFrame.writePayload(AMQFrame.java:81)
>       at 
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl$CompositeAMQBodyBlock.writePayload(ProtocolOutputConverterImpl.java:513)
>       at 
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.writeFrame(AMQPConnection_0_8.java:434)
>       at 
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeFrame(ProtocolOutputConverterImpl.java:465)
>       at 
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDeliveryUnchanged(ProtocolOutputConverterImpl.java:224)
>       at 
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:148)
>       at 
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:101)
>       at 
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeDeliver(ProtocolOutputConverterImpl.java:78)
>       at 
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8$WriteDeliverMethod.deliverToClient(AMQPConnection_0_8.java:1421)
>       at 
> org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8.sendToClient(ConsumerTarget_0_8.java:489)
>       at 
> org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8$AckConsumer.doSend(ConsumerTarget_0_8.java:293)
>       at 
> org.apache.qpid.server.consumer.AbstractConsumerTarget.sendNextMessage(AbstractConsumerTarget.java:211)
>       at 
> org.apache.qpid.server.consumer.AbstractConsumerTarget.processPending(AbstractConsumerTarget.java:63)
>       at 
> org.apache.qpid.server.protocol.v0_8.AMQChannel.processPending(AMQChannel.java:3757)
>       at 
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.processPending(AMQPConnection_0_8.java:1549)
>       at 
> org.apache.qpid.server.transport.MultiVersionProtocolEngine.processPending(MultiVersionProtocolEngine.java:197)
>       at 
> org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:220)
>       at 
> org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:121)
>       at 
> org.apache.qpid.server.transport.NetworkConnectionScheduler.access$000(NetworkConnectionScheduler.java:37)
>       at 
> org.apache.qpid.server.transport.NetworkConnectionScheduler$2.run(NetworkConnectionScheduler.java:102)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:724)
> {noformat}
> with the broker VM memory settings as follows 
> {noformat}
> -Dvirtualhost.connectionThreadPool.maximum=20 
> -Dvirtualhost.connectionThreadPool.minimum=16 -verbose:gc -Xmx2048m -Xms1024m 
> -XX:MaxDirectMemorySize=3072m
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to