Author: tabish
Date: Tue Sep 20 15:22:53 2011
New Revision: 1173188
URL: http://svn.apache.org/viewvc?rev=1173188&view=rev
Log:
change for https://issues.apache.org/jira/browse/AMQ-3467
fix and issue created when the new changes were merged.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1173188&r1=1173187&r2=1173188&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Tue Sep 20 15:22:53 2011
@@ -1754,7 +1754,6 @@ public class MessageDatabase extends Ser
Math.max(rc.orderIndex.nextMessageId,
nextMessageId);
}
}
-
}
if (metadata.version < VERSION) {
@@ -1769,10 +1768,10 @@ public class MessageDatabase extends Ser
if (sequences == null) {
sequences = new SequenceSet();
sequences.add(messageSequence);
- sd.ackPositions.put(tx, subscriptionKey, sequences);
+ sd.ackPositions.add(tx, subscriptionKey, sequences);
} else {
sequences.add(messageSequence);
- sd.ackPositions.add(tx, subscriptionKey, sequences);
+ sd.ackPositions.put(tx, subscriptionKey, sequences);
}
Long count = sd.messageReferences.get(messageSequence);
@@ -1789,10 +1788,10 @@ public class MessageDatabase extends Ser
if (sequences == null) {
sequences = new SequenceSet();
sequences.add(messageSequence);
- sd.ackPositions.put(tx, subscriptionKey, sequences);
+ sd.ackPositions.add(tx, subscriptionKey, sequences);
} else {
sequences.add(messageSequence);
- sd.ackPositions.add(tx, subscriptionKey, sequences);
+ sd.ackPositions.put(tx, subscriptionKey, sequences);
}
Long count = sd.messageReferences.get(messageSequence);
@@ -1811,10 +1810,10 @@ public class MessageDatabase extends Ser
if (sequences == null) {
sequences = new SequenceSet();
sequences.add(new Sequence(messageSequence, messageSequence +
1));
- sd.ackPositions.put(tx, subscriptionKey, sequences);
+ sd.ackPositions.add(tx, subscriptionKey, sequences);
} else {
sequences.add(new Sequence(messageSequence, messageSequence +
1));
- sd.ackPositions.add(tx, subscriptionKey, sequences);
+ sd.ackPositions.put(tx, subscriptionKey, sequences);
}
Long count = sd.messageReferences.get(messageSequence);
@@ -1875,12 +1874,12 @@ public class MessageDatabase extends Ser
* @param sequenceId
* @throws IOException
*/
- private void removeAckLocation(Transaction tx, StoredDestination sd,
String subscriptionKey, Long sequenceId) throws IOException {
+ private void removeAckLocation(Transaction tx, StoredDestination sd,
String subscriptionKey, Long messageSequence) throws IOException {
// Remove the sub from the previous location set..
- if (sequenceId != null) {
+ if (messageSequence != null) {
SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
if (range != null && !range.isEmpty()) {
- range.remove(sequenceId);
+ range.remove(messageSequence);
if (!range.isEmpty()) {
sd.ackPositions.put(tx, subscriptionKey, range);
} else {
@@ -1888,18 +1887,18 @@ public class MessageDatabase extends Ser
}
// Check if the message is reference by any other subscription.
- Long count = sd.messageReferences.get(sequenceId);
+ Long count = sd.messageReferences.get(messageSequence);
long references = count.longValue() - 1;
if (references > 0) {
- sd.messageReferences.put(sequenceId,
Long.valueOf(references));
+ sd.messageReferences.put(messageSequence,
Long.valueOf(references));
return;
} else {
- sd.messageReferences.remove(sequenceId);
+ sd.messageReferences.remove(messageSequence);
}
// Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new
ArrayList<Entry<Long, MessageKeys>>();
- sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+ sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
// Do the actual deletes.
for (Entry<Long, MessageKeys> entry : deletes) {