Updated Branches: refs/heads/trunk 85d3413ff -> d12dbb322
Fixes APLO-342: Memory leaks when using openwire with transactions Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/d12dbb32 Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/d12dbb32 Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/d12dbb32 Branch: refs/heads/trunk Commit: d12dbb322122c07c77c47ae94ffd33f425895518 Parents: 85d3413 Author: Hiram Chirino <[email protected]> Authored: Thu Jan 30 13:20:45 2014 -0500 Committer: Hiram Chirino <[email protected]> Committed: Thu Jan 30 13:20:45 2014 -0500 ---------------------------------------------------------------------- .../openwire/OpenwireProtocolHandler.scala | 2 ++ .../apollo/openwire/test/TransactionTest.scala | 28 ++++++++++++++++++++ .../activemq/apollo/util/FunSuiteSupport.scala | 8 ++++++ 3 files changed, 38 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/d12dbb32/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala ---------------------------------------------------------------------- diff --git a/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala b/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala index cde434e..97aa182 100644 --- a/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala +++ b/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala @@ -587,11 +587,13 @@ class OpenwireProtocolHandler extends ProtocolHandler { case TransactionInfo.COMMIT_ONE_PHASE => get_tx_ctx(id).commit { + remove_tx_ctx(id) ack(info) } case TransactionInfo.ROLLBACK => get_tx_ctx(id).rollback + remove_tx_ctx(id) ack(info) case TransactionInfo.FORGET => http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/d12dbb32/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala ---------------------------------------------------------------------- diff --git a/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala b/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala index 20fab9e..3a1fa71 100644 --- a/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala +++ b/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala @@ -128,4 +128,32 @@ class OpenwireLevelDBTransactionTest extends TransactionTest { // disconnect() } } + ignore("APLO-342: Test memory usage"){ + connect() + val dest = queue(next_id("example")) + val message_count = 1000000 + val producer_session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val producer = producer_session.createProducer(dest) + producer.setDeliveryMode(DeliveryMode.PERSISTENT) + + val consumer_session = default_connection.createSession(true, Session.SESSION_TRANSACTED) + val consumer = consumer_session.createConsumer(dest) + + for( i <- 1 to message_count) { + if( (i % (message_count/100)) == 0) { + println("On message: %d, jvm heap: %.2f".format(i, getJVMHeapUsage/(1024*1024.0))) + } + val x = producer_session.createTextMessage("x" * (1024*64)) + x.setIntProperty("i", i) + producer.send(x) + + val m = consumer.receive(1000).asInstanceOf[TextMessage] + m should not be (null) + m.getIntProperty("i") should be (i) + consumer_session.commit() + } + } + + } + http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/d12dbb32/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala ---------------------------------------------------------------------- diff --git a/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala b/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala index d3fa066..410f6d0 100644 --- a/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala +++ b/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala @@ -28,6 +28,9 @@ import scala.Some import org.apache.activemq.apollo.util.FunSuiteSupport._ import java.util.concurrent.locks.{ReentrantReadWriteLock, Lock, ReadWriteLock} import java.util.concurrent.atomic.AtomicLong +import java.lang.management.ManagementFactory +import javax.management.ObjectName +import javax.management.openmbean.CompositeData object FunSuiteSupport { class SkipTestException extends RuntimeException @@ -58,6 +61,11 @@ abstract class FunSuiteSupport extends FunSuite with Logging with ParallelBefore def skip(check:Boolean=true):Unit = if(check) throw new SkipTestException() + def getJVMHeapUsage = { + val mbean_server = ManagementFactory.getPlatformMBeanServer() + val data = mbean_server.getAttribute(new ObjectName("java.lang:type=Memory"), "HeapMemoryUsage").asInstanceOf[CompositeData] + data.get("used").asInstanceOf[java.lang.Long].longValue() + } var _log:Log = null override protected def log: Log = {
