Author: tabish
Date: Fri Sep 16 19:31:30 2011
New Revision: 1171743
URL: http://svn.apache.org/viewvc?rev=1171743&view=rev
Log:
fixes for https://issues.apache.org/jira/browse/AMQ-3467
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1171743&r1=1171742&r2=1171743&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Sep 16 19:31:30 2011
@@ -157,7 +157,7 @@ public class KahaDBStore extends Message
public boolean isConcurrentStoreAndDispatchTransactions() {
return this.concurrentStoreAndDispatchTransactions;
}
-
+
/**
* @return the maxAsyncJobs
*/
@@ -361,7 +361,7 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet =
wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(),
packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() &&
message.isResponseRequired(), null, null);
-
+
}
public void removeMessage(ConnectionContext context, MessageAck ack)
throws IOException {
@@ -479,7 +479,7 @@ public class KahaDBStore extends Message
}
}
-
+
public void recoverNextMessages(final int maxReturned, final
MessageRecoveryListener listener) throws Exception {
indexLock.readLock().lock();
try {
@@ -534,10 +534,10 @@ public class KahaDBStore extends Message
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
// externally synchronize...
-
+
indexLock.writeLock().lock();
try {
- pageFile.tx().execute(new
Transaction.Closure<IOException>() {
+ pageFile.tx().execute(new
Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException
{
StoredDestination sd = getStoredDestination(dest,
tx);
Long location = sd.messageIdIndex.get(tx, key);
@@ -546,14 +546,12 @@ public class KahaDBStore extends Message
}
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
-
} finally {
unlockAsyncJobQueue();
}
-
}
@Override
@@ -618,7 +616,7 @@ public class KahaDBStore extends Message
public void acknowledge(ConnectionContext context, String clientId,
String subscriptionName,
MessageId messageId, MessageAck ack)
throws IOException {
- String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
+ String subscriptionKey = subscriptionKey(clientId,
subscriptionName).toString();
if (isConcurrentStoreAndDispatchTopics()) {
AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
StoreTopicTask task = null;
@@ -660,7 +658,7 @@ public class KahaDBStore extends Message
.getSubscriptionName());
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
- command.setSubscriptionKey(subscriptionKey);
+ command.setSubscriptionKey(subscriptionKey.toString());
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet =
wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(),
packet.getOffset(), packet.getLength()));
@@ -671,7 +669,7 @@ public class KahaDBStore extends Message
public void deleteSubscription(String clientId, String
subscriptionName) throws IOException {
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
- command.setSubscriptionKey(subscriptionKey(clientId,
subscriptionName));
+ command.setSubscriptionKey(subscriptionKey(clientId,
subscriptionName).toString());
store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.decrementAndGet();
}
@@ -730,21 +728,13 @@ public class KahaDBStore extends Message
return pageFile.tx().execute(new
Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
- LastAck cursorPos = sd.subscriptionAcks.get(tx,
subscriptionKey);
+ LastAck cursorPos = getLastAck(tx, sd,
subscriptionKey);
if (cursorPos == null) {
// The subscription might not exist.
return 0;
}
- int counter = 0;
- for (Iterator<Entry<Long, HashSet<String>>> iterator =
- sd.ackPositions.iterator(tx,
cursorPos.lastAckedSequence); iterator.hasNext();) {
- Entry<Long, HashSet<String>> entry =
iterator.next();
- if (entry.getValue().contains(subscriptionKey)) {
- counter++;
- }
- }
- return counter;
+ return (int) getStoredMessageCount(tx, sd,
subscriptionKey);
}
});
}finally {
@@ -755,13 +745,14 @@ public class KahaDBStore extends Message
public void recoverSubscription(String clientId, String
subscriptionName, final MessageRecoveryListener listener)
throws Exception {
final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
+ @SuppressWarnings("unused")
final SubscriptionInfo info = lookupSubscription(clientId,
subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
- LastAck cursorPos = sd.subscriptionAcks.get(tx,
subscriptionKey);
+ LastAck cursorPos = getLastAck(tx, sd,
subscriptionKey);
sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator
.hasNext();) {
@@ -779,6 +770,7 @@ public class KahaDBStore extends Message
public void recoverNextMessages(String clientId, String
subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
+ @SuppressWarnings("unused")
final SubscriptionInfo info = lookupSubscription(clientId,
subscriptionName);
indexLock.writeLock().lock();
try {
@@ -788,7 +780,7 @@ public class KahaDBStore extends Message
sd.orderIndex.resetCursorPosition();
MessageOrderCursor moc =
sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) {
- LastAck pos = sd.subscriptionAcks.get(tx,
subscriptionKey);
+ LastAck pos = getLastAck(tx, sd, subscriptionKey);
if (pos == null) {
// sub deleted
return;
@@ -858,7 +850,7 @@ public class KahaDBStore extends Message
/**
* Cleanup method to remove any state associated with the given
destination.
* This method does not stop the message store (it might not be cached).
- *
+ *
* @param destination
* Destination to forget
*/
@@ -868,7 +860,7 @@ public class KahaDBStore extends Message
/**
* Cleanup method to remove any state associated with the given destination
* This method does not stop the message store (it might not be cached).
- *
+ *
* @param destination
* Destination to forget
*/
@@ -920,7 +912,7 @@ public class KahaDBStore extends Message
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
-
+
public long getLastProducerSequenceId(ProducerId id) {
indexLock.readLock().lock();
try {
@@ -1184,7 +1176,7 @@ public class KahaDBStore extends Message
/**
* add a key
- *
+ *
* @param key
* @return true if all acknowledgements received
*/
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1171743&r1=1171742&r2=1171743&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Sep 16 19:31:30 2011
@@ -27,8 +27,23 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Stack;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -56,11 +71,9 @@ import org.apache.activemq.util.Callback
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
-import org.apache.kahadb.util.LocationMarshaller;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.index.BTreeVisitor;
+import org.apache.kahadb.index.ListIndex;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
@@ -70,6 +83,7 @@ import org.apache.kahadb.page.Transactio
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LocationMarshaller;
import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.Marshaller;
@@ -77,6 +91,8 @@ import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MessageDatabase extends ServiceSupport implements
BrokerServiceAware {
@@ -97,7 +113,7 @@ public class MessageDatabase extends Ser
static final long NOT_ACKED = -1;
static final long UNMATCHED_SEQ = -2;
- static final int VERSION = 3;
+ static final int VERSION = 4;
protected class Metadata {
protected Page<Metadata> page;
@@ -513,6 +529,7 @@ public class MessageDatabase extends Ser
}
}
+ @SuppressWarnings("unused")
private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
return TransactionIdConversion.convertToLocal(tx);
}
@@ -1185,7 +1202,7 @@ public class MessageDatabase extends Ser
sd.ackPositions.clear(tx);
sd.ackPositions.unload(tx);
- tx.free(sd.ackPositions.getPageId());
+ tx.free(sd.ackPositions.getHeadPageId());
}
String key = key(command.getDestination());
@@ -1207,10 +1224,12 @@ public class MessageDatabase extends Ser
addAckLocationForRetroactiveSub(tx, sd, ackLocation,
subscriptionKey);
}
sd.subscriptionAcks.put(tx, subscriptionKey, new
LastAck(ackLocation));
+ sd.subscriptionCache.add(subscriptionKey);
} else {
// delete the sub...
sd.subscriptions.remove(tx, subscriptionKey);
sd.subscriptionAcks.remove(tx, subscriptionKey);
+ sd.subscriptionCache.remove(subscriptionKey);
removeAckLocationsForSub(tx, sd, subscriptionKey);
}
}
@@ -1468,7 +1487,11 @@ public class MessageDatabase extends Ser
BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
BTreeIndex<String, LastAck> subscriptionAcks;
HashMap<String, MessageOrderCursor> subscriptionCursors;
- BTreeIndex<Long, HashSet<String>> ackPositions;
+ ListIndex<String, SequenceSet> ackPositions;
+
+ // Transient data used to track which Messages are no longer needed.
+ final TreeMap<Long, Long> messageReferences = new TreeMap<Long,
Long>();
+ final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
}
protected class StoredDestinationMarshaller extends
VariableMarshaller<StoredDestination> {
@@ -1483,15 +1506,43 @@ public class MessageDatabase extends Ser
value.subscriptions = new BTreeIndex<String,
KahaSubscriptionCommand>(pageFile, dataIn.readLong());
value.subscriptionAcks = new BTreeIndex<String,
LastAck>(pageFile, dataIn.readLong());
if (metadata.version >= 3) {
- value.ackPositions = new BTreeIndex<Long,
HashSet<String>>(pageFile, dataIn.readLong());
+ value.ackPositions = new ListIndex<String,
SequenceSet>(pageFile, dataIn.readLong());
} else {
// upgrade
pageFile.tx().execute(new
Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException
{
- value.ackPositions = new BTreeIndex<Long,
HashSet<String>>(pageFile, tx.allocate());
-
value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
-
value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
- value.ackPositions.load(tx);
+ BTreeIndex<Long, HashSet<String>> oldAckPositions =
+ new BTreeIndex<Long,
HashSet<String>>(pageFile, tx.allocate());
+
oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
+
oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+ oldAckPositions.load(tx);
+
+ LinkedHashMap<String, SequenceSet> temp = new
LinkedHashMap<String, SequenceSet>();
+
+ // Do the initial build of the data in memory
before writing into the store
+ // based Ack Positions List to avoid a lot of disk
thrashing.
+ Iterator<Entry<Long, HashSet<String>>> iterator =
oldAckPositions.iterator(tx);
+ while (iterator.hasNext()) {
+ Entry<Long, HashSet<String>> entry =
iterator.next();
+
+ for(String subKey : entry.getValue()) {
+ SequenceSet pendingAcks = temp.get(subKey);
+ if (pendingAcks == null) {
+ pendingAcks = new SequenceSet();
+ temp.put(subKey, pendingAcks);
+ }
+
+ pendingAcks.add(entry.getKey());
+ }
+ }
+
+ // Now move the pending messages to ack data into
the store backed
+ // structure.
+ value.ackPositions = new ListIndex<String,
SequenceSet>(pageFile, tx.allocate());
+ for(String subscriptionKey : temp.keySet()) {
+ value.ackPositions.put(tx, subscriptionKey,
temp.get(subscriptionKey));
+ }
+
}
});
}
@@ -1527,7 +1578,7 @@ public class MessageDatabase extends Ser
dataOut.writeBoolean(true);
dataOut.writeLong(value.subscriptions.getPageId());
dataOut.writeLong(value.subscriptionAcks.getPageId());
- dataOut.writeLong(value.ackPositions.getPageId());
+ dataOut.writeLong(value.ackPositions.getHeadPageId());
} else {
dataOut.writeBoolean(false);
}
@@ -1594,7 +1645,7 @@ public class MessageDatabase extends Ser
if (topic) {
rc.subscriptions = new BTreeIndex<String,
KahaSubscriptionCommand>(pageFile, tx.allocate());
rc.subscriptionAcks = new BTreeIndex<String,
LastAck>(pageFile, tx.allocate());
- rc.ackPositions = new BTreeIndex<Long,
HashSet<String>>(pageFile, tx.allocate());
+ rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile,
tx.allocate());
}
metadata.destinations.put(tx, key, rc);
}
@@ -1624,8 +1675,8 @@ public class MessageDatabase extends Ser
rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
rc.subscriptionAcks.load(tx);
- rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
-
rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+ rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
+
rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
rc.ackPositions.load(tx);
rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
@@ -1645,6 +1696,27 @@ public class MessageDatabase extends Ser
}
}
+ // Configure the message references index
+ Iterator<Entry<String, SequenceSet>> subscriptions =
rc.ackPositions.iterator(tx);
+ while (subscriptions.hasNext()) {
+ Entry<String, SequenceSet> subscription = subscriptions.next();
+ if (subscription.getValue() != null) {
+ for(Long sequenceId : subscription.getValue()) {
+ Long current = rc.messageReferences.get(sequenceId);
+ if (current == null) {
+ current = new Long(0);
+ }
+ rc.messageReferences.put(sequenceId,
Long.valueOf(current.longValue() + 1));
+ }
+ }
+ }
+
+ // Configure the subscription cache
+ for (Iterator<Entry<String, LastAck>> iterator =
rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
+ Entry<String, LastAck> entry = iterator.next();
+ rc.subscriptionCache.add(entry.getKey());
+ }
+
if (rc.orderIndex.nextMessageId == 0) {
// check for existing durable sub all acked out - pull next
seq from acks as messages are gone
if (!rc.subscriptionAcks.isEmpty(tx)) {
@@ -1656,16 +1728,16 @@ public class MessageDatabase extends Ser
}
} else {
// update based on ackPositions for unmatched, last entry is
always the next
- if (!rc.ackPositions.isEmpty(tx)) {
- Entry<Long,HashSet<String>> last =
rc.ackPositions.getLast(tx);
+ if (!rc.messageReferences.isEmpty()) {
+ Long nextMessageId = (Long)
rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
rc.orderIndex.nextMessageId =
- Math.max(rc.orderIndex.nextMessageId, last.getKey());
+ Math.max(rc.orderIndex.nextMessageId,
nextMessageId);
}
}
}
- if (metadata.version < 3) {
+ if (metadata.version < VERSION) {
// store again after upgrade
metadata.destinations.put(tx, key, rc);
}
@@ -1673,42 +1745,105 @@ public class MessageDatabase extends Ser
}
private void addAckLocation(Transaction tx, StoredDestination sd, Long
messageSequence, String subscriptionKey) throws IOException {
- HashSet<String> hs = sd.ackPositions.get(tx, messageSequence);
- if (hs == null) {
- hs = new HashSet<String>();
- }
- hs.add(subscriptionKey);
- // every ack location addition needs to be a btree modification to get
it stored
- sd.ackPositions.put(tx, messageSequence, hs);
+ SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
+ if (sequences == null) {
+ sequences = new SequenceSet();
+ sequences.add(messageSequence);
+ sd.ackPositions.put(tx, subscriptionKey, sequences);
+ } else {
+ sequences.add(messageSequence);
+ sd.ackPositions.add(tx, subscriptionKey, sequences);
+ }
+
+ Long count = sd.messageReferences.get(messageSequence);
+ if (count == null) {
+ count = Long.valueOf(0L);
+ }
+ count = count.longValue() + 1;
+ sd.messageReferences.put(messageSequence, count);
}
// new sub is interested in potentially all existing messages
private void addAckLocationForRetroactiveSub(Transaction tx,
StoredDestination sd, Long messageSequence, String subscriptionKey) throws
IOException {
- for (Iterator<Entry<Long, HashSet<String>>> iterator =
sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) {
- Entry<Long, HashSet<String>> entry = iterator.next();
- entry.getValue().add(subscriptionKey);
- sd.ackPositions.put(tx, entry.getKey(), entry.getValue());
+ SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
+ if (sequences == null) {
+ sequences = new SequenceSet();
+ sequences.add(messageSequence);
+ sd.ackPositions.put(tx, subscriptionKey, sequences);
+ } else {
+ sequences.add(messageSequence);
+ sd.ackPositions.add(tx, subscriptionKey, sequences);
+ }
+
+ Long count = sd.messageReferences.get(messageSequence);
+ if (count == null) {
+ count = Long.valueOf(0L);
}
+ count = count.longValue() + 1;
+ sd.messageReferences.put(messageSequence, count);
}
- final HashSet nextMessageIdMarker = new HashSet<String>();
// on a new message add, all existing subs are interested in this message
private void addAckLocationForNewMessage(Transaction tx, StoredDestination
sd, Long messageSequence) throws IOException {
- HashSet hs = new HashSet<String>();
- for (Iterator<Entry<String, LastAck>> iterator =
sd.subscriptionAcks.iterator(tx); iterator.hasNext();) {
- Entry<String, LastAck> entry = iterator.next();
- hs.add(entry.getKey());
- }
- sd.ackPositions.put(tx, messageSequence, hs);
- // add empty next to keep track of nextMessage
- sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
+ for(String subscriptionKey : sd.subscriptionCache) {
+ SequenceSet sequences = null;
+ sequences = sd.ackPositions.get(tx, subscriptionKey);
+ if (sequences == null) {
+ sequences = new SequenceSet();
+ sequences.add(new Sequence(messageSequence, messageSequence +
1));
+ sd.ackPositions.put(tx, subscriptionKey, sequences);
+ } else {
+ sequences.add(new Sequence(messageSequence, messageSequence +
1));
+ sd.ackPositions.add(tx, subscriptionKey, sequences);
+ }
+
+ Long count = sd.messageReferences.get(messageSequence);
+ if (count == null) {
+ count = Long.valueOf(0L);
+ }
+ count = count.longValue() + 1;
+ sd.messageReferences.put(messageSequence, count);
+ sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
+ }
}
private void removeAckLocationsForSub(Transaction tx, StoredDestination
sd, String subscriptionKey) throws IOException {
if (!sd.ackPositions.isEmpty(tx)) {
- Long end = sd.ackPositions.getLast(tx).getKey();
- for (Long sequence = sd.ackPositions.getFirst(tx).getKey();
sequence <= end; sequence++) {
- removeAckLocation(tx, sd, subscriptionKey, sequence);
+ SequenceSet sequences = sd.ackPositions.remove(tx,
subscriptionKey);
+ if (sequences == null || sequences.isEmpty()) {
+ return;
+ }
+
+ ArrayList<Long> unreferenced = new ArrayList<Long>();
+
+ for(Long sequenceId : sequences) {
+ long references = 0;
+ Long count = sd.messageReferences.get(sequenceId);
+ if (count != null) {
+ references = count.longValue() - 1;
+ } else {
+ continue;
+ }
+
+ if (references > 0) {
+ sd.messageReferences.put(sequenceId,
Long.valueOf(references));
+ } else {
+ sd.messageReferences.remove(sequenceId);
+ unreferenced.add(sequenceId);
+ }
+ }
+
+ for(Long sequenceId : unreferenced) {
+ // Find all the entries that need to get deleted.
+ ArrayList<Entry<Long, MessageKeys>> deletes = new
ArrayList<Entry<Long, MessageKeys>>();
+ sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+
+ // Do the actual deletes.
+ for (Entry<Long, MessageKeys> entry : deletes) {
+ sd.locationIndex.remove(tx, entry.getValue().location);
+ sd.messageIdIndex.remove(tx, entry.getValue().messageId);
+ sd.orderIndex.remove(tx, entry.getKey());
+ }
}
}
}
@@ -1723,31 +1858,54 @@ public class MessageDatabase extends Ser
private void removeAckLocation(Transaction tx, StoredDestination sd,
String subscriptionKey, Long sequenceId) throws IOException {
// Remove the sub from the previous location set..
if (sequenceId != null) {
- HashSet<String> hs = sd.ackPositions.get(tx, sequenceId);
- if (hs != null) {
- hs.remove(subscriptionKey);
- if (hs.isEmpty()) {
- HashSet<String> firstSet =
sd.ackPositions.getFirst(tx).getValue();
- sd.ackPositions.remove(tx, sequenceId);
-
- // Find all the entries that need to get deleted.
- ArrayList<Entry<Long, MessageKeys>> deletes = new
ArrayList<Entry<Long, MessageKeys>>();
- sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
-
- // Do the actual deletes.
- for (Entry<Long, MessageKeys> entry : deletes) {
- sd.locationIndex.remove(tx, entry.getValue().location);
- sd.messageIdIndex.remove(tx,
entry.getValue().messageId);
- sd.orderIndex.remove(tx, entry.getKey());
- }
+ SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
+ if (range != null && !range.isEmpty()) {
+ range.remove(sequenceId);
+ if (!range.isEmpty()) {
+ sd.ackPositions.put(tx, subscriptionKey, range);
} else {
- // update
- sd.ackPositions.put(tx, sequenceId, hs);
+ sd.ackPositions.remove(tx, subscriptionKey);
+ }
+
+ // Check if the message is reference by any other subscription.
+ Long count = sd.messageReferences.get(sequenceId);
+ long references = count.longValue() - 1;
+ if (references > 0) {
+ sd.messageReferences.put(sequenceId,
Long.valueOf(references));
+ return;
+ } else {
+ sd.messageReferences.remove(sequenceId);
+ }
+
+ // Find all the entries that need to get deleted.
+ ArrayList<Entry<Long, MessageKeys>> deletes = new
ArrayList<Entry<Long, MessageKeys>>();
+ sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+
+ // Do the actual deletes.
+ for (Entry<Long, MessageKeys> entry : deletes) {
+ sd.locationIndex.remove(tx, entry.getValue().location);
+ sd.messageIdIndex.remove(tx, entry.getValue().messageId);
+ sd.orderIndex.remove(tx, entry.getKey());
}
}
}
}
+ public LastAck getLastAck(Transaction tx, StoredDestination sd, String
subscriptionKey) throws IOException {
+ return sd.subscriptionAcks.get(tx, subscriptionKey);
+ }
+
+ public long getStoredMessageCount(Transaction tx, StoredDestination sd,
String subscriptionKey) throws IOException {
+ SequenceSet messageSequences = sd.ackPositions.get(tx,
subscriptionKey);
+ if (messageSequences != null) {
+ long result = messageSequences.rangeSize();
+ // if there's anything in the range the last value is always the
nextMessage marker, so remove 1.
+ return result > 0 ? result - 1 : 0;
+ }
+
+ return 0;
+ }
+
private String key(KahaDestination destination) {
return destination.getType().getNumber() + ":" + destination.getName();
}
@@ -1799,6 +1957,7 @@ public class MessageDatabase extends Ser
return tx;
}
+ @SuppressWarnings("unused")
private TransactionId key(KahaTransactionInfo transactionInfo) {
return TransactionIdConversion.convert(transactionInfo);
}
@@ -2452,6 +2611,7 @@ public class MessageDatabase extends Ser
dataOut.write(data);
}
+ @SuppressWarnings("unchecked")
public HashSet<String> readPayload(DataInput dataIn) throws
IOException {
int dataLen = dataIn.readInt();
byte[] data = new byte[dataLen];