Author: tabish
Date: Tue Mar 20 16:26:07 2012
New Revision: 1302977
URL: http://svn.apache.org/viewvc?rev=1302977&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-3780
ensure that non-persistent messages are cleaned up from temp storage when the
Queue is deleted.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=1302977&r1=1302976&r2=1302977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Tue Mar 20 16:26:07 2012
@@ -26,8 +26,6 @@ import org.slf4j.LoggerFactory;
/**
* Store based Cursor for Queues
- *
- *
*/
public class StoreQueueCursor extends AbstractPendingMessageCursor {
@@ -42,7 +40,7 @@ public class StoreQueueCursor extends Ab
/**
* Construct
- * @param broker
+ * @param broker
* @param queue
*/
public StoreQueueCursor(Broker broker,Queue queue) {
@@ -78,6 +76,7 @@ public class StoreQueueCursor extends Ab
public synchronized void stop() throws Exception {
started = false;
if (nonPersistent != null) {
+ nonPersistent.clear();
nonPersistent.stop();
nonPersistent.gc();
}
@@ -101,7 +100,7 @@ public class StoreQueueCursor extends Ab
}
}
}
-
+
public synchronized void addMessageFirst(MessageReference node) throws
Exception {
if (node != null) {
Message msg = node.getMessage();
@@ -155,9 +154,9 @@ public class StoreQueueCursor extends Ab
public synchronized void reset() {
nonPersistent.reset();
persistent.reset();
- pendingCount = persistent.size() + nonPersistent.size();
+ pendingCount = persistent.size() + nonPersistent.size();
}
-
+
public void release() {
nonPersistent.release();
persistent.release();
@@ -179,7 +178,7 @@ public class StoreQueueCursor extends Ab
/**
* Informs the Broker if the subscription needs to intervention to recover
* it's state e.g. DurableTopicSubscriber may do
- *
+ *
* @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
@@ -208,8 +207,8 @@ public class StoreQueueCursor extends Ab
}
super.setMaxBatchSize(maxBatchSize);
}
-
-
+
+
public void setMaxProducersToAudit(int maxProducersToAudit) {
super.setMaxProducersToAudit(maxProducersToAudit);
if (persistent != null) {
@@ -229,7 +228,7 @@ public class StoreQueueCursor extends Ab
nonPersistent.setMaxAuditDepth(maxAuditDepth);
}
}
-
+
public void setEnableAudit(boolean enableAudit) {
super.setEnableAudit(enableAudit);
if (persistent != null) {
@@ -239,7 +238,7 @@ public class StoreQueueCursor extends Ab
nonPersistent.setEnableAudit(enableAudit);
}
}
-
+
@Override
public void setUseCache(boolean useCache) {
super.setUseCache(useCache);
@@ -250,7 +249,7 @@ public class StoreQueueCursor extends Ab
nonPersistent.setUseCache(useCache);
}
}
-
+
@Override
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=1302977&r1=1302976&r2=1302977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
Tue Mar 20 16:26:07 2012
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.thread.Scheduler;
@@ -191,6 +192,10 @@ public class PListStore extends ServiceS
}
}
+ public Journal getJournal() {
+ return this.journal;
+ }
+
public File getDirectory() {
return directory;
}
@@ -354,9 +359,9 @@ public class PListStore extends ServiceS
public void run() {
try {
- if (isStopping()) {
+ if (isStopping()) {
return;
- }
+ }
final int lastJournalFileId =
journal.getLastAppendLocation().getDataFileId();
final Set<Integer> candidates = journal.getFileMap().keySet();
LOG.trace("Full gc candidate set:" + candidates);
@@ -370,7 +375,7 @@ public class PListStore extends ServiceS
List<PList> plists = null;
synchronized (indexLock) {
synchronized (this) {
- plists = new ArrayList(persistentLists.values());
+ plists = new
ArrayList<PList>(persistentLists.values());
}
}
for (PList list : plists) {
@@ -481,5 +486,4 @@ public class PListStore extends ServiceS
String path = getDirectory() != null ?
getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
return "PListStore:[" + path + " ]";
}
-
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java?rev=1302977&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
Tue Mar 20 16:26:07 2012
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TempStoreDataCleanupTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TempStoreDataCleanupTest.class);
+ private static final String QUEUE_NAME =
TempStoreDataCleanupTest.class.getName() + "Queue";
+
+ private final String str = new String(
+ "QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR");
+
+ private BrokerService broker;
+ private String connectionUri;
+ private ExecutorService pool;
+ private String queueName;
+ private Random r = new Random();
+
+ @Before
+ public void setUp() throws Exception {
+
+ broker = new BrokerService();
+ broker.setDataDirectory("target" + File.separator + "activemq-data");
+ broker.setPersistent(true);
+ broker.setUseJmx(true);
+ broker.setDedicatedTaskRunner(false);
+ broker.setAdvisorySupport(false);
+ broker.setDeleteAllMessagesOnStartup(true);
+
+ SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy();
+ strategy.setProcessExpired(false);
+ strategy.setProcessNonPersistent(false);
+
+ PolicyEntry defaultPolicy = new PolicyEntry();
+ defaultPolicy.setQueue(">");
+ defaultPolicy.setOptimizedDispatch(true);
+ defaultPolicy.setDeadLetterStrategy(strategy);
+ defaultPolicy.setMemoryLimit(9000000);
+
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(defaultPolicy);
+
+ broker.setDestinationPolicy(policyMap);
+
+ broker.getSystemUsage().getMemoryUsage().setLimit(300000000L);
+
+ broker.addConnector("tcp://localhost:0").setName("Default");
+ broker.start();
+ broker.waitUntilStarted();
+
+ connectionUri =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+ pool = Executors.newFixedThreadPool(10);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ if (pool != null) {
+ pool.shutdown();
+ }
+ }
+
+ @Test
+ public void testIt() throws Exception {
+
+ for (int i = 0; i < 2; i++) {
+ LOG.info("Started the test iteration: " + i + " using queueName =
" + queueName);
+ queueName = QUEUE_NAME + i;
+ final CountDownLatch latch = new CountDownLatch(11);
+
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ receiveAndDiscard100messages(latch);
+ }
+ });
+
+ for (int j = 0; j < 10; j++) {
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ send10000messages(latch);
+ }
+ });
+ }
+
+ LOG.info("Waiting on the send / receive latch");
+ latch.await(5, TimeUnit.MINUTES);
+ LOG.info("Resumed");
+
+ destroyQueue();
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ final PListStore pa = broker.getTempDataStore();
+ assertTrue("only one journal file should be left: " +
pa.getJournal().getFileMap().size(),
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return pa.getJournal().getFileMap().size() == 1;
+ }
+ }, TimeUnit.MINUTES.toMillis(3))
+ );
+ }
+
+ public void destroyQueue() {
+ try {
+ Broker broker = this.broker.getBroker();
+ if (!broker.isStopped()) {
+ LOG.info("Removing: " + queueName);
+
broker.removeDestination(this.broker.getAdminConnectionContext(), new
ActiveMQQueue(queueName), 10);
+ }
+ } catch (Exception e) {
+ LOG.warn("Got an error while removing the test queue", e);
+ }
+ }
+
+ private void send10000messages(CountDownLatch latch) {
+ ActiveMQConnection activeMQConnection = null;
+ try {
+ activeMQConnection = createConnection(null);
+ Session session = activeMQConnection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(session
+ .createQueue(queueName));
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ activeMQConnection.start();
+ for (int i = 0; i < 10000; i++) {
+ TextMessage textMessage = session.createTextMessage();
+ textMessage.setText(generateBody(1000));
+ textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.send(textMessage);
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ }
+ producer.close();
+ } catch (JMSException e) {
+ LOG.warn("Got an error while sending the messages", e);
+ } finally {
+ if (activeMQConnection != null) {
+ try {
+ activeMQConnection.close();
+ } catch (JMSException e) {
+ }
+ }
+ }
+ latch.countDown();
+ }
+
+ private void receiveAndDiscard100messages(CountDownLatch latch) {
+ ActiveMQConnection activeMQConnection = null;
+ try {
+ activeMQConnection = createConnection(null);
+ Session session = activeMQConnection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createConsumer(
+ session.createQueue(queueName));
+ activeMQConnection.start();
+ for (int i = 0; i < 100; i++) {
+ messageConsumer.receive();
+ }
+ messageConsumer.close();
+ LOG.info("Created and disconnected");
+ } catch (JMSException e) {
+ LOG.warn("Got an error while receiving the messages", e);
+ } finally {
+ if (activeMQConnection != null) {
+ try {
+ activeMQConnection.close();
+ } catch (JMSException e) {
+ }
+ }
+ }
+ latch.countDown();
+ }
+
+ private ActiveMQConnection createConnection(String id) throws JMSException
{
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(connectionUri);
+ if (id != null) {
+ factory.setClientID(id);
+ }
+
+ ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection();
+ return connection;
+ }
+
+ private String generateBody(int length) {
+
+ StringBuilder sb = new StringBuilder();
+ int te = 0;
+ for (int i = 1; i <= length; i++) {
+ te = r.nextInt(62);
+ sb.append(str.charAt(te));
+ }
+ return sb.toString();
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
------------------------------------------------------------------------------
svn:eol-style = native