Author: rajdavies
Date: Fri Apr 7 15:02:07 2006
New Revision: 392434
URL: http://svn.apache.org/viewcvs?rev=392434&view=rev
Log:
fix for http://issues.apache.org/activemq/browse/AMQ-676
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
(with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
(with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=392434&r1=392433&r2=392434&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
Fri Apr 7 15:02:07 2006
@@ -38,6 +38,10 @@
this.messageContainer=container;
this.destination=destination;
}
+
+ public Object getId(){
+ return messageContainer.getId();
+ }
public void addMessage(ConnectionContext context,Message message) throws
IOException{
messageContainer.put(message.getMessageId().toString(),message);
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java?rev=392434&r1=392433&r2=392434&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
Fri Apr 7 15:02:07 2006
@@ -18,7 +18,6 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -34,8 +33,6 @@
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.memory.MemoryTransactionStore;
-
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* @org.apache.xbean.XBean
@@ -43,20 +40,21 @@
* @version $Revision: 1.4 $
*/
public class KahaPersistentAdaptor implements PersistenceAdapter{
- MemoryTransactionStore transactionStore;
+ static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
+ KahaTransactionStore transactionStore;
ConcurrentHashMap topics=new ConcurrentHashMap();
ConcurrentHashMap queues=new ConcurrentHashMap();
+ ConcurrentHashMap messageStores=new ConcurrentHashMap();
private boolean useExternalMessageReferences;
- private WireFormat wireFormat = new OpenWireFormat();
+ private OpenWireFormat wireFormat=new OpenWireFormat();
Store store;
public KahaPersistentAdaptor(File dir) throws IOException{
- if (!dir.exists()){
+ if(!dir.exists()){
dir.mkdirs();
}
- String name = dir.getAbsolutePath() + File.separator + "kaha.db";
+ String name=dir.getAbsolutePath()+File.separator+"kaha.db";
store=StoreFactory.open(name,"rw");
-
}
public Set getDestinations(){
@@ -74,6 +72,7 @@
MessageStore rc=(MessageStore) queues.get(destination);
if(rc==null){
rc=new KahaMessageStore(getMapContainer(destination),destination);
+ messageStores.put(destination, rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
}
@@ -92,17 +91,28 @@
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
ackContainer.load();
rc=new
KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
+ messageStores.put(destination, rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
}
topics.put(destination,rc);
+
}
return rc;
}
+ protected MessageStore retrieveMessageStore(Object id){
+ MessageStore result = (MessageStore) messageStores.get(id);
+ return result;
+ }
+
public TransactionStore createTransactionStore() throws IOException{
if(transactionStore==null){
- transactionStore=new MemoryTransactionStore();
+ MapContainer
container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME);
+ container.setKeyMarshaller(new CommandMarshaller(wireFormat));
+ container.setValueMarshaller(new
TransactionMarshaller(wireFormat));
+ container.load();
+ transactionStore=new KahaTransactionStore(this,container);
}
return transactionStore;
}
@@ -155,8 +165,8 @@
}
/**
- * @param usageManager The UsageManager that is controlling the broker's
memory usage.
+ * @param usageManager
+ * The UsageManager that is controlling the broker's memory
usage.
*/
- public void setUsageManager(UsageManager usageManager) {
- }
+ public void setUsageManager(UsageManager usageManager){}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?rev=392434&r1=392433&r2=392434&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Fri Apr 7 15:02:07 2006
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -55,7 +54,6 @@
public synchronized void addMessage(ConnectionContext context,Message
message) throws IOException{
int subscriberCount=subscriberAcks.size();
if(subscriberCount>0){
- super.addMessage(context,message);
String id=message.getMessageId().toString();
ackContainer.put(id,new AtomicInteger(subscriberCount));
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
@@ -63,6 +61,7 @@
ListContainer container=store.getListContainer(key);
container.add(id);
}
+ super.addMessage(context,message);
}
}
@@ -79,7 +78,7 @@
ackContainer.put(id,count);
}else{
// no more references to message messageContainer so
remove it
- container.remove(id);
+ super.removeMessage(messageId);
}
}
}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java?rev=392434&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
Fri Apr 7 15:02:07 2006
@@ -0,0 +1,100 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activemq.command.BaseCommand;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Stores a messages/acknowledgements for a transaction
+ *
+ * @version $Revision: 1.4 $
+ */
+class KahaTransaction{
+ private static final Log log=LogFactory.getLog(KahaTransaction.class);
+ protected List list=new ArrayList();
+
+
+ void add(KahaMessageStore store,BaseCommand command){
+ TxCommand tx=new TxCommand();
+ tx.setCommand(command);
+ tx.setMessageStoreKey(store.getId());
+ list.add(tx);
+ }
+
+ Message[] getMessages(){
+ List result=new ArrayList();
+ for(int i=0;i<list.size();i++){
+ TxCommand command=(TxCommand) list.get(i);
+ if(command.isAdd()){
+ result.add(command.getCommand());
+ }
+ }
+ Message[] messages=new Message[result.size()];
+ return (Message[]) result.toArray(messages);
+ }
+
+ MessageAck[] getAcks(){
+ List result=new ArrayList();
+ for(int i=0;i<list.size();i++){
+ TxCommand command=(TxCommand) list.get(i);
+ if(command.isRemove()){
+ result.add(command.getCommand());
+ }
+ }
+ MessageAck[] acks=new MessageAck[result.size()];
+ return (MessageAck[]) result.toArray(acks);
+ }
+
+ void prepare(){}
+
+ void rollback(){
+ list.clear();
+ }
+
+ /**
+ * @throws IOException
+ */
+ void commit(KahaTransactionStore transactionStore) throws IOException{
+ for(int i=0;i<list.size();i++){
+ TxCommand command=(TxCommand) list.get(i);
+ MessageStore
ms=transactionStore.getStoreById(command.getMessageStoreKey());
+ if(command.isAdd()){
+ ms.addMessage(null,(Message) command.getCommand());
+ }
+ }
+ for(int i=0;i<list.size();i++){
+ TxCommand command=(TxCommand) list.get(i);
+ MessageStore
ms=transactionStore.getStoreById(command.getMessageStoreKey());
+ if(command.isRemove()){
+ ms.removeMessage(null,(MessageAck) command.getCommand());
+ }
+ }
+ }
+
+ List getList(){
+ return new ArrayList(list);
+ }
+
+ void setList(List list){
+ this.list = list;
+ }
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
------------------------------------------------------------------------------
svn:executable = *
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=392434&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
Fri Apr 7 15:02:07 2006
@@ -0,0 +1,176 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.transaction.xa.XAException;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+/**
+ * Provides a TransactionStore implementation that can create transaction
aware MessageStore objects from non
+ * transaction aware MessageStore objects.
+ *
+ * @version $Revision: 1.4 $
+ */
+public class KahaTransactionStore implements TransactionStore{
+ private Map transactions=new ConcurrentHashMap();
+ private Map prepared;
+ private KahaPersistentAdaptor adaptor;
+
+ KahaTransactionStore(KahaPersistentAdaptor adaptor,Map preparedMap){
+ this.adaptor=adaptor;
+ this.prepared=preparedMap;
+ }
+
+ public MessageStore proxy(MessageStore messageStore){
+ return new ProxyMessageStore(messageStore){
+ public void addMessage(ConnectionContext context,final Message
send) throws IOException{
+ KahaTransactionStore.this.addMessage(getDelegate(),send);
+ }
+
+ public void removeMessage(ConnectionContext context,final
MessageAck ack) throws IOException{
+ KahaTransactionStore.this.removeMessage(getDelegate(),ack);
+ }
+ };
+ }
+
+ public TopicMessageStore proxy(TopicMessageStore messageStore){
+ return new ProxyTopicMessageStore(messageStore){
+ public void addMessage(ConnectionContext context,final Message
send) throws IOException{
+ KahaTransactionStore.this.addMessage(getDelegate(),send);
+ }
+
+ public void removeMessage(ConnectionContext context,final
MessageAck ack) throws IOException{
+ KahaTransactionStore.this.removeMessage(getDelegate(),ack);
+ }
+ };
+ }
+
+ /**
+ * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+ */
+ public void prepare(TransactionId txid){
+ KahaTransaction tx=getTx(txid);
+ if(tx!=null){
+ tx.prepare();
+ prepared.put(txid,tx);
+ }
+ }
+
+ /**
+ * @throws XAException
+ * @see
org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+ */
+ public void commit(TransactionId txid,boolean wasPrepared) throws
IOException{
+ KahaTransaction tx=getTx(txid);
+ if(tx!=null){
+ tx.commit(this);
+ removeTx(txid);
+ }
+ }
+
+ /**
+ * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+ */
+ public void rollback(TransactionId txid){
+ KahaTransaction tx=getTx(txid);
+ if(tx!=null){
+ tx.rollback();
+ removeTx(txid);
+ }
+ }
+
+ public void start() throws Exception{}
+
+ public void stop() throws Exception{}
+
+ synchronized public void recover(TransactionRecoveryListener listener)
throws IOException{
+ for(Iterator i=prepared.entrySet().iterator();i.hasNext();){
+ Map.Entry entry=(Entry) i.next();
+ XATransactionId xid=(XATransactionId) entry.getKey();
+ KahaTransaction kt=(KahaTransaction) entry.getValue();
+ listener.recover(xid,kt.getMessages(),kt.getAcks());
+ }
+ }
+
+ /**
+ * @param message
+ * @throws IOException
+ */
+ void addMessage(final MessageStore destination,final Message message)
throws IOException{
+ if(message.isInTransaction()){
+ KahaTransaction tx=getOrCreateTx(message.getTransactionId());
+ tx.add((KahaMessageStore) destination,message);
+ }else{
+ destination.addMessage(null,message);
+ }
+ }
+
+ /**
+ * @param ack
+ * @throws IOException
+ */
+ private void removeMessage(final MessageStore destination,final MessageAck
ack) throws IOException{
+ if(ack.isInTransaction()){
+ KahaTransaction tx=getOrCreateTx(ack.getTransactionId());
+ tx.add((KahaMessageStore) destination,ack);
+ }else{
+ destination.removeMessage(null,ack);
+ }
+ }
+
+ protected synchronized KahaTransaction getTx(TransactionId key){
+ KahaTransaction result=(KahaTransaction) transactions.get(key);
+ if(result==null){
+ result=(KahaTransaction) prepared.get(key);
+ }
+ return result;
+ }
+
+ protected synchronized KahaTransaction getOrCreateTx(TransactionId key){
+ KahaTransaction result=(KahaTransaction) transactions.get(key);
+ if(result==null){
+ result=new KahaTransaction();
+ transactions.put(key,result);
+ }
+ return result;
+ }
+
+ protected synchronized void removeTx(TransactionId key){
+ transactions.remove(key);
+ prepared.remove(key);
+ }
+
+ public void delete(){
+ transactions.clear();
+ prepared.clear();
+ }
+
+ protected MessageStore getStoreById(Object id){
+ return adaptor.retrieveMessageStore(id);
+ }
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
------------------------------------------------------------------------------
svn:executable = *
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java?rev=392434&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
Fri Apr 7 15:02:07 2006
@@ -0,0 +1,87 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activemq.command.BaseCommand;
+import org.apache.activemq.kaha.Marshaller;
+
+/**
+ * Marshall a Transaction
+ * @version $Revision: 1.10 $
+ */
+public class TransactionMarshaller implements Marshaller{
+
+ private WireFormat wireFormat;
+ public TransactionMarshaller(WireFormat wireFormat){
+ this.wireFormat = wireFormat;
+
+ }
+
+ public void writePayload(Object object,DataOutputStream dataOut) throws
IOException{
+ KahaTransaction kt = (KahaTransaction) object;
+ List list = kt.getList();
+ dataOut.writeInt(list.size());
+ for (int i = 0; i < list.size(); i++){
+ TxCommand tx = (TxCommand) list.get(i);
+ Object key = tx.getMessageStoreKey();
+ Packet packet = wireFormat.marshal(key);
+ byte[] data = packet.sliceAsBytes();
+ dataOut.writeInt(data.length);
+ dataOut.write(data);
+ Object command = tx.getCommand();
+ packet = wireFormat.marshal(command);
+ data = packet.sliceAsBytes();
+ dataOut.writeInt(data.length);
+ dataOut.write(data);
+
+ }
+ }
+
+
+ public Object readPayload(DataInputStream dataIn) throws IOException{
+ KahaTransaction result = new KahaTransaction();
+ List list = new ArrayList();
+ result.setList(list);
+ int number=dataIn.readInt();
+ for (int i = 0; i < number; i++){
+ TxCommand command = new TxCommand();
+ int size = dataIn.readInt();
+ byte[] data=new byte[size];
+ dataIn.readFully(data);
+ Object key = wireFormat.unmarshal(new ByteArrayPacket(data));
+ command.setMessageStoreKey(key);
+ size = dataIn.readInt();
+ data=new byte[size];
+ dataIn.readFully(data);
+ BaseCommand bc = (BaseCommand) wireFormat.unmarshal(new
ByteArrayPacket(data));
+ command.setCommand(bc);
+ list.add(command);
+ }
+ return result;
+
+ }
+}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java?rev=392434&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
Fri Apr 7 15:02:07 2006
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import org.apache.activemq.command.BaseCommand;
+import org.apache.activemq.command.CommandTypes;
+
+
+/**
+ * Base class for messages/acknowledgements for a transaction
+ *
+ * @version $Revision: 1.4 $
+ */
+class TxCommand {
+ protected Object messageStoreKey;
+ protected BaseCommand command;
+
+ /**
+ * @return Returns the messageStoreKey.
+ */
+ public Object getMessageStoreKey(){
+ return messageStoreKey;
+ }
+
+ /**
+ * @param messageStoreKey The messageStoreKey to set.
+ */
+ public void setMessageStoreKey(Object messageStoreKey){
+ this.messageStoreKey=messageStoreKey;
+ }
+
+ /**
+ * @return Returns the command.
+ */
+ public BaseCommand getCommand(){
+ return command;
+ }
+
+ /**
+ * @param command The command to set.
+ */
+ public void setCommand(BaseCommand command){
+ this.command=command;
+ }
+
+ /**
+ * @return true if a Message command
+ */
+ public boolean isAdd(){
+ return command != null && command.getDataStructureType() !=
CommandTypes.MESSAGE_ACK;
+ }
+
+ /**
+ * @return true if a MessageAck command
+ */
+ public boolean isRemove(){
+ return command != null && command.getDataStructureType() ==
CommandTypes.MESSAGE_ACK;
+ }
+
+
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
------------------------------------------------------------------------------
svn:executable = *
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java?rev=392434&r1=392433&r2=392434&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java
Fri Apr 7 15:02:07 2006
@@ -16,10 +16,12 @@
*/
package org.apache.activemq.broker.store;
+import java.io.File;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.XARecoveryBrokerTest;
+import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
@@ -39,17 +41,18 @@
}
protected BrokerService createBroker() throws Exception {
- BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new
ClassPathResource("org/apache/activemq/broker/store/kahabroker.xml"));
- brokerFactory.afterPropertiesSet();
- BrokerService broker = brokerFactory.getBroker();
+ BrokerService broker = createRestartedBroker();
broker.setDeleteAllMessagesOnStartup(true);
return broker;
}
protected BrokerService createRestartedBroker() throws Exception {
- BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new
ClassPathResource("org/apache/activemq/broker/store/kahabroker.xml"));
- brokerFactory.afterPropertiesSet();
- return brokerFactory.getBroker();
+ BrokerService broker = new BrokerService();
+
+ KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new
File("activemq-data/storetest"));
+ broker.setPersistenceAdapter(adaptor);
+ broker.addConnector("tcp://localhost:0");
+ return broker;
}
}