Author: chirino
Date: Fri Sep 5 11:07:09 2008
New Revision: 692502
URL: http://svn.apache.org/viewvc?rev=692502&view=rev
Log:
Implemented the topic side of the MesageSTore.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
activemq/sandbox/kahadb/src/main/proto/journal-data.proto
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
Fri Sep 5 11:07:09 2008
@@ -42,13 +42,18 @@
public Location() {
}
- Location(Location item) {
+ public Location(Location item) {
this.dataFileId = item.dataFileId;
this.offset = item.offset;
this.size = item.size;
this.type = item.type;
}
+ public Location(int dataFileId, int offset) {
+ this.dataFileId=dataFileId;
+ this.offset=offset;
+ }
+
boolean isValid() {
return dataFileId != NOT_SET;
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
Fri Sep 5 11:07:09 2008
@@ -22,6 +22,7 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -53,9 +54,12 @@
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.store.MessageDatabase.AddOpperation;
+import org.apache.kahadb.store.MessageDatabase.LocationMarshaller;
import org.apache.kahadb.store.MessageDatabase.Operation;
import org.apache.kahadb.store.MessageDatabase.StoredDestination;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
@@ -68,22 +72,18 @@
import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
import org.apache.kahadb.store.data.KahaRollbackCommand;
import org.apache.kahadb.store.data.KahaTransactionInfo;
+import org.apache.kahadb.store.data.KahaSubscriptionCommand;
import org.apache.kahadb.store.data.KahaXATransactionId;
import org.apache.kahadb.store.data.KahaDestination.DestinationType;
public class KahaDBPersistenceAdaptor extends MessageDatabase implements
PersistenceAdapter {
- private String brokerName;
- private SystemUsage usageManager;
private WireFormat wireFormat = new OpenWireFormat();
private AtomicBoolean started = new AtomicBoolean();
public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
}
-
public void setUsageManager(SystemUsage usageManager) {
- this.usageManager = usageManager;
}
public void start() throws Exception {
@@ -141,9 +141,9 @@
};
}
- class KahaDBMessageStore implements MessageStore {
+ public class KahaDBMessageStore implements MessageStore {
private final ActiveMQDestination destination;
- private KahaDestination dest;
+ protected KahaDestination dest;
public KahaDBMessageStore(ActiveMQDestination destination) {
this.destination = destination;
@@ -252,8 +252,7 @@
if( entry!=null ) {
// Copy the location, cause the iterator gives us
the key by reference.. changing it
// would mess up the index.
- cursorPos = new Location();
-
cursorPos.setDataFileId(entry.getKey().getDataFileId());
+ cursorPos = new Location(entry.getKey());
cursorPos.setOffset(entry.getKey().getOffset()+1 );
}
}
@@ -273,50 +272,184 @@
}
}
-
+
class KahaDBTopicMessageStore extends KahaDBMessageStore implements
TopicMessageStore {
public KahaDBTopicMessageStore(ActiveMQTopic destination) {
super(destination);
}
-
+
public void acknowledge(ConnectionContext context, String clientId,
String subscriptionName, MessageId messageId) throws IOException {
+ KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
+ command.setDestination(dest);
+ command.setSubscriptionKey(subscriptionKey(clientId,
subscriptionName));
+ command.setMessageId(messageId.toString());
+ // We are not passed a transaction info.. so we can't participate
in a transaction.
+ // Looks like a design issue with the TopicMessageStore interface.
Also we can't recover the original ack
+ // to pass back to the XA recover method.
+ // command.setTransactionInfo();
+ store(command, true);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean
retroactive) throws IOException {
+ String subscriptionKey =
subscriptionKey(subscriptionInfo.getClientId(),
subscriptionInfo.getSubscriptionName());
+ KahaSubscriptionCommand command = new KahaSubscriptionCommand();
+ command.setDestination(dest);
+ command.setSubscriptionKey(subscriptionKey);
+ command.setRetroactive(retroactive);
+ ByteSequence packet = wireFormat.marshal(subscriptionInfo);
+ command.setSubscriptionInfo(ByteString.copyFrom(packet.getData(),
packet.getOffset(), packet.getLength()));
+ store(command, true);
}
public void deleteSubscription(String clientId, String
subscriptionName) throws IOException {
+ KahaSubscriptionCommand command = new KahaSubscriptionCommand();
+ command.setDestination(dest);
+ command.setSubscriptionKey(subscriptionKey(clientId,
subscriptionName));
+ store(command, true);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
- return null;
- }
+
+ final ArrayList<SubscriptionInfo> subscriptions = new
ArrayList<SubscriptionInfo>();
+ synchronized(indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+ StoredDestination sd = getStoredDestination(dest, tx);
+ for (Iterator<Entry<String, KahaSubscriptionCommand>>
iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
+ Entry<String, KahaSubscriptionCommand> entry =
iterator.next();
+ SubscriptionInfo info =
(SubscriptionInfo)wireFormat.unmarshal( new
DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
+ subscriptions.add(info);
- public int getMessageCount(String clientId, String subscriberName)
throws IOException {
- return 0;
+ }
+ }
+ });
+ }
+
+ SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
+ subscriptions.toArray(rc);
+ return rc;
}
public SubscriptionInfo lookupSubscription(String clientId, String
subscriptionName) throws IOException {
- return null;
+ final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
+ synchronized(indexMutex) {
+ return pageFile.tx().execute(new
Transaction.CallableClosure<SubscriptionInfo, IOException>(){
+ public SubscriptionInfo execute(Transaction tx) throws
IOException {
+ StoredDestination sd = getStoredDestination(dest, tx);
+ KahaSubscriptionCommand command =
sd.subscriptions.get(tx, subscriptionKey);
+ if( command ==null ) {
+ return null;
+ }
+ return (SubscriptionInfo)wireFormat.unmarshal( new
DataInputStream(command.getSubscriptionInfo().newInput()) );
+ }
+ });
+ }
+ }
+
+ public int getMessageCount(String clientId, String subscriptionName)
throws IOException {
+ final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
+ synchronized(indexMutex) {
+ return pageFile.tx().execute(new
Transaction.CallableClosure<Integer, IOException>(){
+ public Integer execute(Transaction tx) throws IOException {
+ StoredDestination sd = getStoredDestination(dest, tx);
+ Location cursorPos = sd.subscriptionAcks.get(tx,
subscriptionKey);
+ if ( cursorPos==null ) {
+ // The subscription might not exist.
+ return 0;
+ }
+ cursorPos = new Location(cursorPos);
+ cursorPos.setOffset(cursorPos.getOffset()+1 );
+
+ int counter = 0;
+ for (Iterator<Entry<Location, String>> iterator =
sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ iterator.next();
+ counter++;
+ }
+ return counter;
+ }
+ });
+ }
}
- public void recoverNextMessages(String clientId, String
subscriptionName, int maxReturned, MessageRecoveryListener listener) throws
Exception {
+ public void recoverSubscription(String clientId, String
subscriptionName, final MessageRecoveryListener listener) throws Exception {
+ final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
+ synchronized(indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ public void execute(Transaction tx) throws Exception {
+ StoredDestination sd = getStoredDestination(dest, tx);
+ Location cursorPos = new
Location(sd.subscriptionAcks.get(tx, subscriptionKey));
+ cursorPos.setOffset(cursorPos.getOffset()+1 );
+
+ for (Iterator<Entry<Location, String>> iterator =
sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ Entry<Location, String> entry = iterator.next();
+ listener.recoverMessage(
loadMessage(entry.getKey() ) );
+ }
+ }
+ });
+ }
}
- public void recoverSubscription(String clientId, String
subscriptionName, MessageRecoveryListener listener) throws Exception {
+ public void recoverNextMessages(String clientId, String
subscriptionName, final int maxReturned, final MessageRecoveryListener
listener) throws Exception {
+ final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
+ synchronized(indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ public void execute(Transaction tx) throws Exception {
+ StoredDestination sd = getStoredDestination(dest, tx);
+ Location cursorPos =
sd.subscriptionCursors.get(subscriptionKey);
+ if( cursorPos == null ) {
+ cursorPos = new
Location(sd.subscriptionAcks.get(tx, subscriptionKey));
+ cursorPos.setOffset(cursorPos.getOffset()+1 );
+ }
+
+ Entry<Location, String> entry=null;
+ int counter = 0;
+ for (Iterator<Entry<Location, String>> iterator =
sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ entry = iterator.next();
+ listener.recoverMessage(
loadMessage(entry.getKey() ) );
+ counter++;
+ if( counter >= maxReturned ) {
+ break;
+ }
+ }
+ if( entry!=null ) {
+ // Copy the location, cause the iterator gives us
the key by reference.. changing it
+ // would mess up the index.
+ cursorPos = new Location(entry.getKey());
+ cursorPos.setOffset(entry.getKey().getOffset()+1 );
+ sd.subscriptionCursors.put(subscriptionKey,
cursorPos);
+ }
+ }
+ });
+ }
}
public void resetBatching(String clientId, String subscriptionName) {
+ try {
+ final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
+ synchronized(indexMutex) {
+ pageFile.tx().execute(new
Transaction.Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException
{
+ StoredDestination sd = getStoredDestination(dest,
tx);
+ sd.subscriptionCursors.remove(subscriptionKey);
+ }
+ });
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
+ String subscriptionKey(String clientId, String subscriptionName){
+ return clientId+":"+subscriptionName;
+ }
+
public MessageStore createQueueMessageStore(ActiveMQQueue destination)
throws IOException {
return new KahaDBMessageStore(destination);
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic
destination) throws IOException {
- throw new IOException("Not yet implemented.");
-// return new KahaDBTopicMessageStore(destination);
+ return new KahaDBTopicMessageStore(destination);
}
public void deleteAllMessages() throws IOException {
@@ -385,7 +518,7 @@
* @return
* @throws IOException
*/
- private Message loadMessage(Location location) throws IOException {
+ Message loadMessage(Location location) throws IOException {
KahaAddMessageCommand addMessage =
(KahaAddMessageCommand)load(location);
Message msg = (Message)wireFormat.unmarshal( new
DataInputStream(addMessage.getMessage().newInput()) );
return msg;
@@ -395,7 +528,7 @@
// Internal conversion methods.
///////////////////////////////////////////////////////////////////
- private KahaTransactionInfo createTransactionInfo(TransactionId txid) {
+ KahaTransactionInfo createTransactionInfo(TransactionId txid) {
if( txid ==null ) {
return null;
}
@@ -424,14 +557,14 @@
return rc;
}
- private KahaLocation convert(Location location) {
+ KahaLocation convert(Location location) {
KahaLocation rc = new KahaLocation();
rc.setLogId(location.getDataFileId());
rc.setOffset(location.getOffset());
return rc;
}
- private KahaDestination convert(ActiveMQDestination dest) {
+ KahaDestination convert(ActiveMQDestination dest) {
KahaDestination rc = new KahaDestination();
rc.setName(dest.getPhysicalName());
switch( dest.getDestinationType() ) {
@@ -452,7 +585,7 @@
}
}
- private ActiveMQDestination convert(String dest) {
+ ActiveMQDestination convert(String dest) {
int p = dest.indexOf(":");
if( p<0 ) {
throw new IllegalArgumentException("Not in the valid destination
format");
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Fri Sep 5 11:07:09 2008
@@ -20,13 +20,21 @@
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.Map.Entry;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.commons.logging.Log;
@@ -48,6 +56,7 @@
import org.apache.kahadb.store.data.KahaRemoveDestinationCommand;
import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
import org.apache.kahadb.store.data.KahaRollbackCommand;
+import org.apache.kahadb.store.data.KahaSubscriptionCommand;
import org.apache.kahadb.store.data.KahaTraceCommand;
import org.apache.kahadb.store.data.KahaTransactionInfo;
import org.apache.kahadb.store.data.KahaXATransactionId;
@@ -68,15 +77,21 @@
protected Page<Metadata> page;
protected int state;
protected BTreeIndex<String, StoredDestination> destinations;
-
+ protected Location lastUpdate;
+ protected Location firstInProgressTransactionLocation;
+
public void read(DataInput is) throws IOException {
state = is.readInt();
destinations = new BTreeIndex<String, StoredDestination>(pageFile,
is.readLong());
+ lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+ firstInProgressTransactionLocation =
LocationMarshaller.INSTANCE.readPayload(is);
}
public void write(DataOutput os) throws IOException {
os.writeInt(state);
os.writeLong(destinations.getPageId());
+ LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
+
LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation,
os);
}
}
@@ -160,6 +175,9 @@
metadata.page = page;
metadata.state = CLOSED_STATE;
metadata.destinations = new BTreeIndex<String,
StoredDestination>(pageFile, tx.allocate().getPageId());
+ metadata.lastUpdate = new Location(0,0);
+ metadata.firstInProgressTransactionLocation = new
Location(0,0);
+
tx.store(metadata.page, metadataMarshaller, true);
} else {
Page<Metadata> page = tx.load(0, metadataMarshaller);
@@ -245,9 +263,10 @@
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
data.writeTo(os);
- Location result = asyncDataManager.write(os.getByteSequence(), sync);
- process(data, result);
- return result;
+ Location location = asyncDataManager.write(os.getByteSequence(), sync);
+ process(data, location);
+ metadata.lastUpdate = location;
+ return location;
}
@@ -304,6 +323,11 @@
public void visit(KahaRemoveDestinationCommand command) throws
IOException {
process(command, location);
}
+
+ @Override
+ public void visit(KahaSubscriptionCommand command) throws
IOException {
+ process(command, location);
+ }
});
}
@@ -348,6 +372,16 @@
}
}
+ protected void process(final KahaSubscriptionCommand command, final
Location location) throws IOException {
+ synchronized(indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ updateIndex(tx, command, location);
+ }
+ });
+ }
+ }
+
protected void process(KahaCommitCommand command, Location location)
throws IOException {
TransactionId key = key(command.getTransactionInfo());
ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
@@ -385,7 +419,7 @@
preparedTransactions.remove(key);
}
}
-
+
///////////////////////////////////////////////////////////////////
// These methods do the actual index updates.
///////////////////////////////////////////////////////////////////
@@ -394,16 +428,43 @@
private void upadateIndex(Transaction tx, KahaAddMessageCommand command,
Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(),
tx);
+
+ // Skip adding the message to the index if this is a topic and there
are no subscriptions.
+ if( sd.subscriptions!=null && sd.ackLocations.isEmpty() ) {
+ return;
+ }
+
+ // Add the message.
sd.orderIndex.put(tx, location, command.getMessageId());
sd.messageIdIndex.put(tx, command.getMessageId(), location);
}
- private void updateIndex(Transaction tx, KahaRemoveMessageCommand command,
Location location) throws IOException {
+ private void updateIndex(Transaction tx, KahaRemoveMessageCommand command,
Location ackLocation) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(),
tx);
- Location l = sd.messageIdIndex.remove(tx, command.getMessageId());
- if( l!=null ) {
- sd.orderIndex.remove(tx, l);
+ if( !command.hasSubscriptionKey() ) {
+ // In the queue case we just remove the message from the index..
+ Location messageLocation = sd.messageIdIndex.remove(tx,
command.getMessageId());
+ if( messageLocation!=null ) {
+ sd.orderIndex.remove(tx, messageLocation);
+ }
+ } else {
+ // In the topic case we need remove the message once it's been
acked by all the subs
+ Location messageLocation = sd.messageIdIndex.get(tx,
command.getMessageId());
+
+ // Make sure it's a valid message id...
+ if( messageLocation!=null ) {
+ String subscriptionKey = command.getSubscriptionKey();
+ Location prev = sd.subscriptionAcks.put(tx, subscriptionKey,
messageLocation);
+
+ // The following method handles deleting un-referenced
messages.
+ removeAckLocation(tx, sd, subscriptionKey, prev);
+
+ // Add it to the new location set.
+ addAckLocation(sd, messageLocation, subscriptionKey);
+ }
+
}
+ metadata.lastUpdate = ackLocation;
}
private void updateIndex(Transaction tx, KahaRemoveDestinationCommand
command, Location location) throws IOException {
@@ -415,10 +476,47 @@
tx.free(sd.orderIndex.getPageId());
tx.free(sd.messageIdIndex.getPageId());
+ if( sd.subscriptions!=null ) {
+ sd.subscriptions.clear(tx);
+ sd.subscriptionAcks.clear(tx);
+ sd.subscriptions.unload();
+ sd.subscriptionAcks.unload();
+ tx.free(sd.subscriptions.getPageId());
+ tx.free(sd.subscriptionAcks.getPageId());
+ }
+
String key = key(command.getDestination());
storedDestinations.remove(key);
metadata.destinations.remove(tx, key);
}
+
+ private void updateIndex(Transaction tx, KahaSubscriptionCommand command,
Location location) throws IOException {
+ StoredDestination sd = getStoredDestination(command.getDestination(),
tx);
+
+ // If set then we are creating it.. otherwise we are destroying the sub
+ if( command.hasSubscriptionInfo() ) {
+ String subscriptionKey = command.getSubscriptionKey();
+ sd.subscriptions.put(tx, subscriptionKey, command);
+ Location ackLocation;
+ if( command.getRetroactive() ) {
+ ackLocation = new Location(0,0);
+ } else {
+ ackLocation = location;
+ }
+
+ sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
+ addAckLocation(sd, ackLocation, subscriptionKey);
+ } else {
+ // delete the sub...
+ String subscriptionKey = command.getSubscriptionKey();
+ sd.subscriptions.remove(tx, subscriptionKey);
+ Location prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
+ removeAckLocation(tx, sd, subscriptionKey, prev);
+ }
+
+ }
+
+
///////////////////////////////////////////////////////////////////
// StoredDestination related implementation methods.
@@ -426,9 +524,22 @@
private final HashMap<String, StoredDestination> storedDestinations = new
HashMap<String, StoredDestination>();
+ class StoredSubscription {
+ SubscriptionInfo subscriptionInfo;
+ String lastAckId;
+ Location lastAckLocation;
+ Location cursor;
+ }
+
static class StoredDestination {
BTreeIndex<Location, String> orderIndex;
BTreeIndex<String, Location> messageIdIndex;
+
+ // These bits are only set for Topics
+ BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
+ BTreeIndex<String, Location> subscriptionAcks;
+ HashMap<String, Location> subscriptionCursors;
+ TreeMap<Location, HashSet<String>> ackLocations;
}
protected class StoredDestinationMarshaller implements
Marshaller<StoredDestination> {
@@ -437,15 +548,27 @@
}
public StoredDestination readPayload(DataInput dataIn) throws
IOException {
- StoredDestination rc = new StoredDestination();
- rc.orderIndex = new BTreeIndex<Location, String>(pageFile,
dataIn.readLong());
- rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile,
dataIn.readLong());
- return rc;
- }
-
- public void writePayload(StoredDestination object, DataOutput dataOut)
throws IOException {
- dataOut.writeLong(object.orderIndex.getPageId());
- dataOut.writeLong(object.messageIdIndex.getPageId());
+ StoredDestination value = new StoredDestination();
+ value.orderIndex = new BTreeIndex<Location, String>(pageFile,
dataIn.readLong());
+ value.messageIdIndex = new BTreeIndex<String, Location>(pageFile,
dataIn.readLong());
+
+ if( dataIn.readBoolean() ) {
+ value.subscriptions = new BTreeIndex<String,
KahaSubscriptionCommand>(pageFile, dataIn.readLong());
+ value.subscriptionAcks = new BTreeIndex<String,
Location>(pageFile, dataIn.readLong());
+ }
+ return value;
+ }
+
+ public void writePayload(StoredDestination value, DataOutput dataOut)
throws IOException {
+ dataOut.writeLong(value.orderIndex.getPageId());
+ dataOut.writeLong(value.messageIdIndex.getPageId());
+ if( value.subscriptions !=null ) {
+ dataOut.writeBoolean(true);
+ dataOut.writeLong(value.subscriptions.getPageId());
+ dataOut.writeLong(value.subscriptionAcks.getPageId());
+ } else {
+ dataOut.writeBoolean(false);
+ }
}
}
@@ -468,6 +591,24 @@
dataOut.writeInt(object.getOffset());
}
}
+
+ static class KahaSubscriptionCommandMarshaller implements
Marshaller<KahaSubscriptionCommand> {
+ final static KahaSubscriptionCommandMarshaller INSTANCE = new
KahaSubscriptionCommandMarshaller();
+
+ public Class<KahaSubscriptionCommand> getType() {
+ return KahaSubscriptionCommand.class;
+ }
+
+ public KahaSubscriptionCommand readPayload(DataInput dataIn) throws
IOException {
+ KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
+ rc.mergeFrom((InputStream)dataIn);
+ return rc;
+ }
+
+ public void writePayload(KahaSubscriptionCommand object, DataOutput
dataOut) throws IOException {
+ object.writeTo((OutputStream)dataOut);
+ }
+ }
protected StoredDestination getStoredDestination(KahaDestination
destination, Transaction tx) throws IOException {
String key = key(destination);
@@ -480,16 +621,43 @@
rc = new StoredDestination();
rc.orderIndex = new BTreeIndex<Location, String>(pageFile,
tx.allocate());
rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile,
tx.allocate());
+
+ if( destination.getType() ==
KahaDestination.DestinationType.TOPIC || destination.getType() ==
KahaDestination.DestinationType.TEMP_TOPIC ) {
+ rc.subscriptions = new BTreeIndex<String,
KahaSubscriptionCommand>(pageFile, tx.allocate());
+ rc.subscriptionAcks = new BTreeIndex<String,
Location>(pageFile, tx.allocate());
+ }
}
// Configure the marshalers and load.
rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
rc.orderIndex.load();
+
rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
rc.messageIdIndex.load();
+ // If it was a topic...
+ if( rc.subscriptions!=null ) {
+
+ rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
+
rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
+ rc.subscriptions.load();
+
+
rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
+
rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
+ rc.subscriptionAcks.load();
+
+ rc.ackLocations = new TreeMap<Location, HashSet<String>>();
+ rc.subscriptionCursors = new HashMap<String, Location>();
+
+ for (Iterator<Entry<String, Location>> iterator =
rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+ Entry<String,Location> entry = iterator.next();
+ addAckLocation(rc, entry.getValue(), entry.getKey());
+ }
+
+ }
+
// Cache it. We may want to remove/unload destinations from the
cache that are not used for a while
// to reduce memory usage.
storedDestinations.put(key, rc);
@@ -497,6 +665,67 @@
return rc;
}
+ /**
+ * @param sd
+ * @param messageLocation
+ * @param subscriptionKey
+ */
+ private void addAckLocation(StoredDestination sd, Location
messageLocation, String subscriptionKey) {
+ HashSet<String> hs = sd.ackLocations.get(messageLocation);
+ if( hs == null ) {
+ hs = new HashSet<String>();
+ sd.ackLocations.put(messageLocation, hs);
+ }
+ hs.add(subscriptionKey);
+ }
+
+
+ /**
+ * @param tx
+ * @param sd
+ * @param subscriptionKey
+ * @param location
+ * @throws IOException
+ */
+ private void removeAckLocation(Transaction tx, StoredDestination sd,
String subscriptionKey, Location location) throws IOException {
+ // Remove the sub from the previous location set..
+ if( location!=null ) {
+ HashSet<String> hs = sd.ackLocations.get(location);
+ if(hs!=null) {
+ hs.remove(subscriptionKey);
+ if( hs.isEmpty() ) {
+ HashSet<String> firstSet =
sd.ackLocations.values().iterator().next();
+ sd.ackLocations.remove(location);
+
+ // Did we just empty out the first set in the
+ // ordered list of ack locations? Then it's time to
+ // delete some messages.
+ if( hs==firstSet ) {
+
+
+ // Find all the entries that need to get deleted.
+ ArrayList<Entry<Location, String>> deletes = new
ArrayList<Entry<Location, String>>();
+ for (Iterator<Entry<Location, String>> iterator =
sd.orderIndex.iterator(tx); iterator.hasNext();) {
+ Entry<Location, String> entry = iterator.next();
+ while( entry.getKey().compareTo(location) <= 0 ) {
+ // We don't do the actually delete while we
are iterating the BTree since
+ // iterating would fail.
+ deletes.add(entry);
+ }
+ }
+
+ // Do the actual deletes.
+ for (Entry<Location, String> entry : deletes) {
+ sd.messageIdIndex.remove(tx, entry.getValue());
+ sd.orderIndex.remove(tx, entry.getKey());
+ }
+
+ }
+ }
+ }
+ }
+ }
+
private String key(KahaDestination destination) {
return destination.getType().getNumber()+":"+destination.getName();
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
Fri Sep 5 11:07:09 2008
@@ -25,6 +25,7 @@
import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
import org.apache.kahadb.store.data.KahaRollbackCommand;
import org.apache.kahadb.store.data.KahaTraceCommand;
+import org.apache.kahadb.store.data.KahaSubscriptionCommand;
public class Visitor {
@@ -49,4 +50,7 @@
public void visit(KahaRemoveDestinationCommand command) throws IOException
{
}
+ public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand)
throws IOException {
+ }
+
}
Modified: activemq/sandbox/kahadb/src/main/proto/journal-data.proto
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/journal-data.proto?rev=692502&r1=692501&r2=692502&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/journal-data.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/journal-data.proto Fri Sep 5
11:07:09 2008
@@ -28,6 +28,7 @@
KAHA_COMMIT_COMMAND = 4;
KAHA_ROLLBACK_COMMAND = 5;
KAHA_REMOVE_DESTINATION_COMMAND = 6;
+ KAHA_SUBSCRIPTION_COMMAND = 7;
}
message KahaTraceCommand {
@@ -61,7 +62,8 @@
optional KahaTransactionInfo transaction_info=1;
required KahaDestination destination = 2;
required string messageId = 3;
- required bytes ack = 4;
+ optional bytes ack = 4;
+ optional string subscriptionKey = 5; // Set if it is a topic ack.
}
message KahaPrepareCommand {
@@ -95,6 +97,18 @@
required KahaDestination destination = 1;
}
+
+message KahaSubscriptionCommand {
+ //| option java_implments =
"org.apache.kahadb.store.JournalCommand<KahaSubscriptionCommand>";
+ //| option java_visitor =
"org.apache.kahadb.store.Visitor:void:java.io.IOException";
+ //| option java_type_method = "KahaEntryType";
+
+ required KahaDestination destination = 1;
+ required string subscriptionKey = 2;
+ optional bool retroactive = 3;
+ optional bytes subscriptionInfo = 4;
+}
+
message KahaDestination {
enum DestinationType {
QUEUE = 0;