Author: gtully
Date: Wed Sep 3 07:14:38 2008
New Revision: 691621
URL: http://svn.apache.org/viewvc?rev=691621&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1926 with test case
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=691621&r1=691620&r2=691621&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
Wed Sep 3 07:14:38 2008
@@ -434,6 +434,9 @@
purgeList.add(dataFile);
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("lastFileId=" + lastFile + ", purgeList: (" +
purgeList.size() + ") " + purgeList);
+ }
for (DataFile dataFile : purgeList) {
forceRemoveDataFile(dataFile);
}
@@ -465,17 +468,17 @@
throws IOException {
accessorPool.disposeDataFileAccessors(dataFile);
fileByFileMap.remove(dataFile.getFile());
- DataFile removed = fileMap.remove(dataFile.getDataFileId());
storeSize.addAndGet(-dataFile.getLength());
dataFile.unlink();
if (archiveDataLogs) {
dataFile.move(getDirectoryArchive());
- LOG.info("moved data file " + dataFile + " to "
+ LOG.debug("moved data file " + dataFile + " to "
+ getDirectoryArchive());
} else {
boolean result = dataFile.delete();
- LOG.info("discarding data file " + dataFile
- + (result ? "successful " : "failed"));
+ if (!result) {
+ LOG.info("Failed to discard data file " + dataFile);
+ }
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=691621&r1=691620&r2=691621&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Wed Sep 3 07:14:38 2008
@@ -261,6 +261,9 @@
data = messages.remove(id);
if (data == null) {
messageAcks.add(ack);
+ } else {
+ // message never got written so datafileReference will still
exist
+
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this,
data.getFileId());
}
}finally {
lock.unlock();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=691621&r1=691620&r2=691621&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Wed Sep 3 07:14:38 2008
@@ -26,9 +26,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activeio.journal.Journal;
@@ -122,7 +122,7 @@
private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
private int
maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
- private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new
ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
+ private Map<AMQMessageStore,Map<Integer, AtomicInteger>>
dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer,
AtomicInteger>> ();
private String directoryPath = "";
private RandomAccessFile lockFile;
private FileLock lock;
@@ -271,14 +271,14 @@
checkpoint(false);
}
};
- Scheduler.executePeriodically(periodicCheckpointTask,
checkpointInterval);
+ Scheduler.executePeriodically(periodicCheckpointTask,
getCheckpointInterval());
periodicCleanupTask = new Runnable() {
public void run() {
cleanup();
}
};
- Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
+ Scheduler.executePeriodically(periodicCleanupTask,
getCleanupInterval());
if (lockAquired && lockLogged) {
LOG.info("Aquired lock for AMQ Store" + getDirectory());
@@ -426,8 +426,11 @@
public void cleanup() {
try {
Set<Integer>inProgress = new HashSet<Integer>();
- for (Set<Integer> set: dataFilesInProgress.values()) {
- inProgress.addAll(set);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("dataFilesInProgress.values: (" +
dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
+ }
+ for (Map<Integer, AtomicInteger> set:
dataFilesInProgress.values()) {
+ inProgress.addAll(set.keySet());
}
Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
inProgress.add(lastDataFile);
@@ -437,6 +440,7 @@
if (lastActiveTx != null) {
lastDataFile = Math.min(lastDataFile,
lastActiveTx.getDataFileId());
}
+ LOG.debug("lastDataFile: " + lastDataFile);
asyncDataManager.consolidateDataFilesNotIn(inProgress,
lastDataFile - 1);
} catch (IOException e) {
LOG.error("Could not cleanup data files: " + e, e);
@@ -967,18 +971,32 @@
protected void addInProgressDataFile(AMQMessageStore store,int
dataFileId) {
- Set<Integer>set = dataFilesInProgress.get(store);
- if (set == null) {
- set = new CopyOnWriteArraySet<Integer>();
- dataFilesInProgress.put(store, set);
+ Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
+ if (map == null) {
+ map = new ConcurrentHashMap<Integer, AtomicInteger>();
+ dataFilesInProgress.put(store, map);
+ }
+ AtomicInteger count = map.get(dataFileId);
+ if (count == null) {
+ count = new AtomicInteger(0);
+ map.put(dataFileId, count);
}
- set.add(dataFileId);
+ count.incrementAndGet();
}
protected void removeInProgressDataFile(AMQMessageStore store,int
dataFileId) {
- Set<Integer>set = dataFilesInProgress.get(store);
- if (set != null) {
- set.remove(dataFileId);
+ Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
+ if (map != null) {
+ AtomicInteger count = map.get(dataFileId);
+ if (count != null) {
+ int newCount = count.decrementAndGet();
+ if (newCount <=0) {
+ map.remove(dataFileId);
+ }
+ }
+ if (map.isEmpty()) {
+ dataFilesInProgress.remove(store);
+ }
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=691621&r1=691620&r2=691621&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
Wed Sep 3 07:14:38 2008
@@ -72,6 +72,7 @@
result.setUseNio(isUseNio());
result.setMaxFileLength(getMaxFileLength());
result.setCleanupInterval(getCleanupInterval());
+ result.setCheckpointInterval(getCheckpointInterval());
result.setIndexBinSize(getIndexBinSize());
result.setIndexKeySize(getIndexKeySize());
result.setIndexPageSize(getIndexPageSize());
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java?rev=691621&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
Wed Sep 3 07:14:38 2008
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * see https://issues.apache.org/activemq/browse/AMQ-1926
+ */
+public class DataFileNotDeletedTest extends TestCase {
+
+ private static final Log LOG =
LogFactory.getLog(DataFileNotDeletedTest.class);
+
+ private final CountDownLatch latch = new CountDownLatch(max_messages);
+ private static int max_messages = 600;
+ private static int messageCounter;
+ private String destinationName = getName()+"_Queue";
+ private BrokerService broker;
+ private Connection receiverConnection;
+ private Connection producerConnection;
+ final boolean useTopic = false;
+
+ AMQPersistenceAdapter persistentAdapter;
+ protected static final String payload = new String(new byte[512]);
+
+ public void setUp() throws Exception {
+ messageCounter = 0;
+ startBroker();
+ receiverConnection = createConnection();
+ receiverConnection.start();
+ producerConnection = createConnection();
+ producerConnection.start();
+ }
+
+ public void tearDown() throws Exception {
+ receiverConnection.close();
+ producerConnection.close();
+ broker.stop();
+ }
+
+ public void testForDataFileNotDeleted() throws Exception {
+ doTestForDataFileNotDeleted(false);
+ }
+
+ public void testForDataFileNotDeletedTransacted() throws Exception {
+ doTestForDataFileNotDeleted(true);
+ }
+
+ private void doTestForDataFileNotDeleted(boolean transacted) throws
Exception {
+
+ Receiver receiver = new Receiver() {
+ public void receive(String s) throws Exception {
+ messageCounter++;
+ latch.countDown();
+ }
+ };
+ buildReceiver(receiverConnection, destinationName, transacted,
receiver, useTopic);
+
+ final MessageSender producer = new MessageSender(destinationName,
producerConnection, transacted, useTopic);
+ for (int i=0; i< max_messages; i++) {
+ producer.send(payload );
+ }
+ latch.await();
+ assertEquals(max_messages, messageCounter);
+ waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(),
30000, 2);
+ }
+
+ private void waitFordataFilesToBeCleanedUp(
+ AsyncDataManager asyncDataManager, int timeout, int numExpected)
throws InterruptedException {
+ long expiry = System.currentTimeMillis() + timeout;
+ while(expiry > System.currentTimeMillis()) {
+ if (asyncDataManager.getFiles().size() <= numExpected) {
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ assertEquals("persistence adapter dataManager has correct number of
files", 2, asyncDataManager.getFiles().size());
+ }
+
+ private Connection createConnection() throws JMSException {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
+ return factory.createConnection();
+ }
+
+ private void startBroker() throws Exception {
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistent(true);
+ broker.setUseJmx(true);
+ broker.addConnector("tcp://localhost:61616").setName("Default");
+
+ AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory)
broker.getPersistenceFactory();
+ // ensure there are a bunch of data files but multiple entries in each
+ factory.setMaxFileLength(1024 * 20);
+ // speed up the test case, checkpoint an cleanup early and often
+ factory.setCheckpointInterval(500);
+ factory.setCleanupInterval(500);
+ factory.setSyncOnWrite(false);
+
+ persistentAdapter = (AMQPersistenceAdapter)
broker.getPersistenceAdapter();
+ broker.start();
+ LOG.info("Starting broker..");
+ }
+
+ private void buildReceiver(Connection connection, final String queueName,
boolean transacted, final Receiver receiver, boolean isTopic) throws Exception {
+ final Session session = transacted ? connection.createSession(true,
Session.SESSION_TRANSACTED) : connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer inputMessageConsumer = session.createConsumer(isTopic
? session.createTopic(queueName) : session.createQueue(queueName));
+ MessageListener messageListener = new MessageListener() {
+
+ public void onMessage(Message message) {
+ try {
+ ObjectMessage objectMessage = (ObjectMessage)message;
+ String s = (String)objectMessage.getObject();
+ receiver.receive(s);
+ if (session.getTransacted()) {
+ session.commit();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ inputMessageConsumer.setMessageListener(messageListener);
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date