Author: chirino
Date: Thu Sep 4 21:55:29 2008
New Revision: 692336
URL: http://svn.apache.org/viewvc?rev=692336&view=rev
Log:
Got XA recovery working
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
activemq/sandbox/kahadb/src/main/proto/journal-data.proto
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=692336&r1=692335&r2=692336&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
Thu Sep 4 21:55:29 2008
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.Map.Entry;
@@ -54,6 +55,8 @@
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.store.MessageDatabase.AddOpperation;
+import org.apache.kahadb.store.MessageDatabase.Operation;
import org.apache.kahadb.store.MessageDatabase.StoredDestination;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaCommitCommand;
@@ -96,17 +99,41 @@
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore(){
+
public void commit(TransactionId txid, boolean wasPrepared) throws
IOException {
store(new
KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true);
}
public void prepare(TransactionId txid) throws IOException {
store(new
KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true);
}
- public void recover(TransactionRecoveryListener listener) throws
IOException {
- }
public void rollback(TransactionId txid) throws IOException {
store(new
KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false);
}
+ public void recover(TransactionRecoveryListener listener) throws
IOException {
+ for (Map.Entry<TransactionId, ArrayList<Operation>> entry :
preparedTransactions.entrySet()) {
+ XATransactionId xid = (XATransactionId)entry.getKey();
+ ArrayList<Message> messageList = new ArrayList<Message>();
+ ArrayList<MessageAck> ackList = new
ArrayList<MessageAck>();
+
+ for (Operation op : entry.getValue()) {
+ if( op.getClass() == AddOpperation.class ) {
+ AddOpperation addOp = (AddOpperation)op;
+ Message msg = (Message)wireFormat.unmarshal( new
DataInputStream(addOp.getCommand().getMessage().newInput()) );
+ messageList.add(msg);
+ } else {
+ RemoveOpperation rmOp = (RemoveOpperation)op;
+ MessageAck ack = (MessageAck)wireFormat.unmarshal(
new DataInputStream(rmOp.getCommand().getAck().newInput()) );
+ ackList.add(ack);
+ }
+ }
+
+ Message[] addedMessages = new Message[messageList.size()];
+ MessageAck[] acks = new MessageAck[ackList.size()];
+ messageList.toArray(addedMessages);
+ ackList.toArray(acks);
+ listener.recover(xid, addedMessages, acks);
+ }
+ }
public void start() throws Exception {
}
public void stop() throws Exception {
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692336&r1=692335&r2=692336&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Thu Sep 4 21:55:29 2008
@@ -535,16 +535,15 @@
}
}
- class Operation {
+ abstract class Operation {
final Location location;
public Operation(Location location) {
this.location = location;
}
- public void execute(Transaction tx) throws IOException {
- }
+ abstract public void execute(Transaction tx) throws IOException;
}
- private class AddOpperation extends Operation {
+ class AddOpperation extends Operation {
final KahaAddMessageCommand command;
public AddOpperation(KahaAddMessageCommand command, Location location)
{
@@ -555,9 +554,13 @@
public void execute(Transaction tx) throws IOException {
upadateIndex(tx, command, location);
}
+
+ public KahaAddMessageCommand getCommand() {
+ return command;
+ }
}
- private class RemoveOpperation extends Operation {
+ class RemoveOpperation extends Operation {
final KahaRemoveMessageCommand command;
public RemoveOpperation(KahaRemoveMessageCommand command, Location
location) {
@@ -568,6 +571,10 @@
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
+
+ public KahaRemoveMessageCommand getCommand() {
+ return command;
+ }
}
Modified: activemq/sandbox/kahadb/src/main/proto/journal-data.proto
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/journal-data.proto?rev=692336&r1=692335&r2=692336&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/journal-data.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/journal-data.proto Thu Sep 4
21:55:29 2008
@@ -61,6 +61,7 @@
optional KahaTransactionInfo transaction_info=1;
required KahaDestination destination = 2;
required string messageId = 3;
+ required bytes ack = 4;
}
message KahaPrepareCommand {