Repository: activemq Updated Branches: refs/heads/trunk ab3de0c4c -> 67ead201e
https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - single dest test with low limit exposed ignored setbatch in kahadb when sequence was not found in the index due to acking - resolved and validated with test that verifies dlq is empty Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/67ead201 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/67ead201 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/67ead201 Branch: refs/heads/trunk Commit: 67ead201e1cb0e7dc019002d7a2e4be53184261d Parents: ab3de0c Author: gtully <gary.tu...@gmail.com> Authored: Tue Oct 21 16:04:54 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Tue Oct 21 16:05:46 2014 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 4 +- .../activemq/store/kahadb/MessageDatabase.java | 17 +++----- .../java/org/apache/activemq/TestSupport.java | 2 +- .../activemq/bugs/AMQ5266SingleDestTest.java | 45 +++++--------------- 4 files changed, 21 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/67ead201/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 21d7522..5778fed 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1200,7 +1200,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } finally { pagedInMessagesLock.readLock().unlock(); } - messagesLock.readLock().lock(); + messagesLock.writeLock().lock(); try{ try { messages.reset(); @@ -1217,7 +1217,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index messages.release(); } }finally { - messagesLock.readLock().unlock(); + messagesLock.writeLock().unlock(); } return null; } http://git-wip-us.apache.org/repos/asf/activemq/blob/67ead201/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 4de5f16..88dde75 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2791,7 +2791,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe BTreeIndex<Long, MessageKeys> defaultPriorityIndex; BTreeIndex<Long, MessageKeys> lowPriorityIndex; BTreeIndex<Long, MessageKeys> highPriorityIndex; - MessageOrderCursor cursor = new MessageOrderCursor(); + final MessageOrderCursor cursor = new MessageOrderCursor(); Long lastDefaultKey; Long lastHighKey; Long lastLowKey; @@ -2892,16 +2892,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (defaultPriorityIndex.containsKey(tx, sequence)) { lastDefaultKey = sequence; cursor.defaultCursorPosition = nextPosition.longValue(); - } else if (highPriorityIndex != null) { - if (highPriorityIndex.containsKey(tx, sequence)) { - lastHighKey = sequence; - cursor.highPriorityCursorPosition = nextPosition.longValue(); - } else if (lowPriorityIndex.containsKey(tx, sequence)) { - lastLowKey = sequence; - cursor.lowPriorityCursorPosition = nextPosition.longValue(); - } + } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequence)) { + lastHighKey = sequence; + cursor.highPriorityCursorPosition = nextPosition.longValue(); + } else if (lowPriorityIndex.containsKey(tx, sequence)) { + lastLowKey = sequence; + cursor.lowPriorityCursorPosition = nextPosition.longValue(); } else { - LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this); lastDefaultKey = sequence; cursor.defaultCursorPosition = nextPosition.longValue(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/67ead201/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java index 46eecfb..80aac14 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java @@ -179,7 +179,7 @@ public abstract class TestSupport extends CombinationTestSupport { return setPersistenceAdapter(broker, defaultPersistenceAdapter); } - public PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException { + public static PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException { PersistenceAdapter adapter = null; switch (choice) { case JDBC: http://git-wip-us.apache.org/repos/asf/activemq/blob/67ead201/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java index afcf54f..131f807 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java @@ -35,15 +35,13 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.derby.jdbc.EmbeddedDataSource; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -63,7 +61,6 @@ public class AMQ5266SingleDestTest { static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class); String activemqURL; BrokerService brokerService; - private EmbeddedDataSource dataSource; public int numDests = 1; public int messageSize = 10*1000; @@ -84,7 +81,7 @@ public class AMQ5266SingleDestTest { public boolean useCache = true; @Parameterized.Parameter(5) - public boolean useDefaultStore = false; + public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB; @Parameterized.Parameter(6) public boolean optimizeDispatch = false; @@ -92,7 +89,7 @@ public class AMQ5266SingleDestTest { @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}") public static Iterable<Object[]> parameters() { return Arrays.asList(new Object[][]{ - {1000, 80, 80, 1024*1024*5, true, true, false}, + {1000, 80, 80, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, }); } @@ -102,22 +99,10 @@ public class AMQ5266SingleDestTest { public void startBroker() throws Exception { brokerService = new BrokerService(); - dataSource = new EmbeddedDataSource(); - dataSource.setDatabaseName("target/derbyDb"); - dataSource.setCreateDatabase("create"); - - JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); - jdbcPersistenceAdapter.setDataSource(dataSource); - jdbcPersistenceAdapter.setUseLock(false); - - if (!useDefaultStore) { - brokerService.setPersistenceAdapter(jdbcPersistenceAdapter); - } else { - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); - kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true); - } + TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice); brokerService.setDeleteAllMessagesOnStartup(true); brokerService.setUseJmx(false); + brokerService.setAdvisorySupport(false); PolicyMap policyMap = new PolicyMap(); @@ -133,11 +118,12 @@ public class AMQ5266SingleDestTest { policyMap.setDefaultEntry(defaultEntry); brokerService.setDestinationPolicy(policyMap); - brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); + brokerService.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 1024); TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); brokerService.start(); activemqURL = transportConnector.getPublishableConnectString(); + activemqURL += "?jms.watchTopicAdvisories=false"; // ensure all messages are queue or dlq messages } @After @@ -145,10 +131,6 @@ public class AMQ5266SingleDestTest { if (brokerService != null) { brokerService.stop(); } - try { - dataSource.setShutdownDatabase("shutdown"); - dataSource.getConnection(); - } catch (Exception ignored) {} } @Test @@ -202,9 +184,6 @@ public class AMQ5266SingleDestTest { try { int secs = (int) (endWait - System.currentTimeMillis()) / 1000; LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); - if (!useDefaultStore) { - DefaultJDBCAdapter.dumpTables(dataSource.getConnection()); - } Thread.sleep(1000); } catch (Exception e) { } @@ -217,11 +196,6 @@ public class AMQ5266SingleDestTest { consumer.shutdown(); TimeUnit.SECONDS.sleep(2); - LOG.info("DB Contents START"); - if (!useDefaultStore) { - DefaultJDBCAdapter.dumpTables(dataSource.getConnection()); - } - LOG.info("DB Contents END"); LOG.info("Consumer Stats:"); @@ -243,6 +217,9 @@ public class AMQ5266SingleDestTest { assertEquals("expect to get all messages!", 0, diff); } + + // verify empty dlq + assertEquals("No pending messages", 0l, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); } public class ExportQueuePublisher {