Author: rajdavies
Date: Wed Jul 18 13:32:45 2007
New Revision: 557389
URL: http://svn.apache.org/viewvc?view=rev&rev=557389
Log:
Split out Transaction class from AMQTrandactionStore -
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
(with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?view=diff&rev=557389&r1=557388&r2=557389
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
Wed Jul 18 13:32:45 2007
@@ -19,13 +19,10 @@
package org.apache.activemq.store.amq;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-
import javax.transaction.xa.XAException;
-
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
@@ -39,92 +36,15 @@
/**
*/
-public class AMQTransactionStore implements TransactionStore {
+public class AMQTransactionStore implements TransactionStore{
private final AMQPersistenceAdapter peristenceAdapter;
- Map<TransactionId, Tx> inflightTransactions = new
LinkedHashMap<TransactionId, Tx>();
- Map<TransactionId, Tx> preparedTransactions = new
LinkedHashMap<TransactionId, Tx>();
+ Map<TransactionId,AMQTx> inflightTransactions=new
LinkedHashMap<TransactionId,AMQTx>();
+ Map<TransactionId,AMQTx> preparedTransactions=new
LinkedHashMap<TransactionId,AMQTx>();
private boolean doingRecover;
-
- public static class TxOperation {
-
- static final byte ADD_OPERATION_TYPE = 0;
- static final byte REMOVE_OPERATION_TYPE = 1;
- static final byte ACK_OPERATION_TYPE = 3;
-
- public byte operationType;
- public AMQMessageStore store;
- public Object data;
- public Location location;
-
- public TxOperation(byte operationType, AMQMessageStore store, Object
data, Location location) {
- this.operationType=operationType;
- this.store=store;
- this.data=data;
- this.location=location;
- }
-
- }
- /**
- * Operations
- * @version $Revision: 1.6 $
- */
- public static class Tx {
-
- private final Location location;
- private ArrayList<TxOperation> operations = new
ArrayList<TxOperation>();
-
- public Tx(Location location) {
- this.location=location;
- }
-
- public void add(AMQMessageStore store, Message msg, Location location)
{
- operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE,
store, msg, location));
- }
-
- public void add(AMQMessageStore store, MessageAck ack) {
- operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE,
store, ack, null));
- }
-
- public void add(AMQTopicMessageStore store, JournalTopicAck ack) {
- operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE,
store, ack, null));
- }
-
- public Message[] getMessages() {
- ArrayList<Object> list = new ArrayList<Object>();
- for (Iterator<TxOperation> iter = operations.iterator();
iter.hasNext();) {
- TxOperation op = iter.next();
- if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
- list.add(op.data);
- }
- }
- Message rc[] = new Message[list.size()];
- list.toArray(rc);
- return rc;
- }
-
- public MessageAck[] getAcks() {
- ArrayList<Object> list = new ArrayList<Object>();
- for (Iterator<TxOperation> iter = operations.iterator();
iter.hasNext();) {
- TxOperation op = iter.next();
- if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
- list.add(op.data);
- }
- }
- MessageAck rc[] = new MessageAck[list.size()];
- list.toArray(rc);
- return rc;
- }
-
- public ArrayList<TxOperation> getOperations() {
- return operations;
- }
-
- }
-
- public AMQTransactionStore(AMQPersistenceAdapter adapter) {
- this.peristenceAdapter = adapter;
+ public AMQTransactionStore(AMQPersistenceAdapter adapter){
+ this.peristenceAdapter=adapter;
}
/**
@@ -132,7 +52,7 @@
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException{
- Tx tx=null;
+ AMQTx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
@@ -143,13 +63,13 @@
preparedTransactions.put(txid,tx);
}
}
-
+
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void replayPrepare(TransactionId txid) throws IOException{
- Tx tx=null;
+ AMQTx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
@@ -160,13 +80,13 @@
}
}
- public Tx getTx(TransactionId txid,Location location){
- Tx tx=null;
+ public AMQTx getTx(TransactionId txid,Location location){
+ AMQTx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.get(txid);
}
if(tx==null){
- tx=new Tx(location);
+ tx=new AMQTx(location);
inflightTransactions.put(txid,tx);
}
return tx;
@@ -177,7 +97,7 @@
* @see
org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid,boolean wasPrepared) throws
IOException{
- Tx tx;
+ AMQTx tx;
if(wasPrepared){
synchronized(preparedTransactions){
tx=preparedTransactions.remove(txid);
@@ -201,7 +121,7 @@
* @throws XAException
* @see
org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws
IOException{
+ public AMQTx replayCommit(TransactionId txid,boolean wasPrepared) throws
IOException{
if(wasPrepared){
synchronized(preparedTransactions){
return preparedTransactions.remove(txid);
@@ -218,7 +138,7 @@
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException{
- Tx tx=null;
+ AMQTx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
@@ -251,13 +171,13 @@
}
}
}
-
- public void start() throws Exception {
+
+ public void start() throws Exception{
}
- public void stop() throws Exception {
+ public void stop() throws Exception{
}
-
+
synchronized public void recover(TransactionRecoveryListener listener)
throws IOException{
// All the in-flight transactions get rolled back..
synchronized(inflightTransactions){
@@ -265,13 +185,13 @@
}
this.doingRecover=true;
try{
- Map<TransactionId, Tx> txs=null;
+ Map<TransactionId,AMQTx> txs=null;
synchronized(preparedTransactions){
- txs=new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
+ txs=new
LinkedHashMap<TransactionId,AMQTx>(preparedTransactions);
}
for(Iterator<TransactionId>
iter=txs.keySet().iterator();iter.hasNext();){
Object txid=iter.next();
- Tx tx=txs.get(txid);
+ AMQTx tx=txs.get(txid);
listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
}
}finally{
@@ -283,26 +203,24 @@
* @param message
* @throws IOException
*/
- void addMessage(AMQMessageStore store, Message message, Location location)
throws IOException {
- Tx tx = getTx(message.getTransactionId(), location);
- tx.add(store, message, location);
+ void addMessage(AMQMessageStore store,Message message,Location location)
throws IOException{
+ AMQTx tx=getTx(message.getTransactionId(),location);
+ tx.add(store,message,location);
}
/**
* @param ack
* @throws IOException
*/
- public void removeMessage(AMQMessageStore store, MessageAck ack, Location
location) throws IOException {
- Tx tx = getTx(ack.getTransactionId(), location);
- tx.add(store, ack);
- }
-
-
- public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack,
Location location) {
- Tx tx = getTx(ack.getTransactionId(), location);
- tx.add(store, ack);
+ public void removeMessage(AMQMessageStore store,MessageAck ack,Location
location) throws IOException{
+ AMQTx tx=getTx(ack.getTransactionId(),location);
+ tx.add(store,ack);
}
+ public void acknowledge(AMQTopicMessageStore store,JournalTopicAck
ack,Location location){
+ AMQTx tx=getTx(ack.getTransactionId(),location);
+ tx.add(store,ack);
+ }
public Location checkpoint() throws IOException{
// Nothing really to checkpoint.. since, we don't
@@ -312,18 +230,18 @@
// roll over active tx records.
Location rc=null;
synchronized(inflightTransactions){
- for(Iterator<Tx>
iter=inflightTransactions.values().iterator();iter.hasNext();){
- Tx tx=iter.next();
- Location location=tx.location;
+ for(Iterator<AMQTx>
iter=inflightTransactions.values().iterator();iter.hasNext();){
+ AMQTx tx=iter.next();
+ Location location=tx.getLocation();
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
}
}
synchronized(preparedTransactions){
- for(Iterator<Tx>
iter=preparedTransactions.values().iterator();iter.hasNext();){
- Tx tx=iter.next();
- Location location=tx.location;
+ for(Iterator<AMQTx>
iter=preparedTransactions.values().iterator();iter.hasNext();){
+ AMQTx tx=iter.next();
+ Location location=tx.getLocation();
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
@@ -332,9 +250,24 @@
}
}
- public boolean isDoingRecover() {
+ public boolean isDoingRecover(){
return doingRecover;
}
+ /**
+ * @return the preparedTransactions
+ */
+ public Map<TransactionId,AMQTx> getPreparedTransactions(){
+ return this.preparedTransactions;
+ }
+ /**
+ * @param preparedTransactions the preparedTransactions to set
+ */
+ public void setPreparedTransactions(Map<TransactionId,AMQTx>
preparedTransactions){
+ if(preparedTransactions!=null){
+ this.preparedTransactions.clear();
+ this.preparedTransactions.putAll(preparedTransactions);
+ }
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java?view=diff&rev=557389&r1=557388&r2=557389
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java
Wed Jul 18 13:32:45 2007
@@ -15,10 +15,84 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.activemq.store.amq;
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.impl.async.Location;
+
+
+/**
+ */
/**
- * @version $Revision: 1.1 $
+ * Operations
+ * @version $Revision: 1.6 $
*/
-public interface AMQTx {
+public class AMQTx{
+
+ private final Location location;
+ private ArrayList<AMQTxOperation> operations=new
ArrayList<AMQTxOperation>();
+
+ public AMQTx(Location location){
+ this.location=location;
+ }
+
+ public void add(AMQMessageStore store,Message msg,Location location){
+ operations.add(new
AMQTxOperation(AMQTxOperation.ADD_OPERATION_TYPE,store.getDestination(),msg,location));
+ }
+
+ public void add(AMQMessageStore store,MessageAck ack){
+ operations.add(new
AMQTxOperation(AMQTxOperation.REMOVE_OPERATION_TYPE,store.getDestination(),ack,null));
+ }
+
+ public void add(AMQTopicMessageStore store,JournalTopicAck ack){
+ operations.add(new
AMQTxOperation(AMQTxOperation.ACK_OPERATION_TYPE,store.getDestination(),ack,null));
+ }
+
+ public Message[] getMessages(){
+ ArrayList<Object> list=new ArrayList<Object>();
+ for(Iterator<AMQTxOperation>
iter=operations.iterator();iter.hasNext();){
+ AMQTxOperation op=iter.next();
+ if(op.getOperationType()==AMQTxOperation.ADD_OPERATION_TYPE){
+ list.add(op.getData());
+ }
+ }
+ Message rc[]=new Message[list.size()];
+ list.toArray(rc);
+ return rc;
+ }
+
+ public MessageAck[] getAcks(){
+ ArrayList<Object> list=new ArrayList<Object>();
+ for(Iterator<AMQTxOperation>
iter=operations.iterator();iter.hasNext();){
+ AMQTxOperation op=iter.next();
+ if(op.getOperationType()==AMQTxOperation.REMOVE_OPERATION_TYPE){
+ list.add(op.getData());
+ }
+ }
+ MessageAck rc[]=new MessageAck[list.size()];
+ list.toArray(rc);
+ return rc;
+ }
+
+ /**
+ * @return the location
+ */
+ public Location getLocation(){
+ return this.location;
+ }
+
+ public ArrayList<AMQTxOperation> getOperations(){
+ return operations;
+ }
+
+ public void setOperations(ArrayList<AMQTxOperation> operations){
+ this.operations=operations;
+ }
}
+
+
\ No newline at end of file
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java?view=auto&rev=557389
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
Wed Jul 18 13:32:45 2007
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.amq;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+
+
+/**
+ */
+public class AMQTxOperation {
+
+ public static final byte ADD_OPERATION_TYPE=0;
+ public static final byte REMOVE_OPERATION_TYPE=1;
+ public static final byte ACK_OPERATION_TYPE=3;
+ private byte operationType;
+ private ActiveMQDestination destination;
+ private Object data;
+ private Location location;
+
+ public AMQTxOperation() {
+ }
+
+ public AMQTxOperation(byte operationType,ActiveMQDestination
destination,Object data,Location location){
+ this.operationType=operationType;
+ this.destination=destination;
+ this.data=data;
+ this.location=location;
+
+ }
+
+ /**
+ * @return the data
+ */
+ public Object getData(){
+ return this.data;
+ }
+
+ /**
+ * @param data the data to set
+ */
+ public void setData(Object data){
+ this.data=data;
+ }
+
+ /**
+ * @return the location
+ */
+ public Location getLocation(){
+ return this.location;
+ }
+
+ /**
+ * @param location the location to set
+ */
+ public void setLocation(Location location){
+ this.location=location;
+ }
+
+ /**
+ * @return the operationType
+ */
+ public byte getOperationType(){
+ return this.operationType;
+ }
+
+ /**
+ * @param operationType the operationType to set
+ */
+ public void setOperationType(byte operationType){
+ this.operationType=operationType;
+ }
+
+
+ public boolean replay(AMQPersistenceAdapter adapter,ConnectionContext
context) throws IOException{
+ boolean result=false;
+ AMQMessageStore
store=(AMQMessageStore)adapter.createMessageStore(destination);
+ if(operationType==ADD_OPERATION_TYPE){
+ result=store.replayAddMessage(context,(Message)data,location);
+ }else if(operationType==REMOVE_OPERATION_TYPE){
+ result=store.replayRemoveMessage(context,(MessageAck)data);
+ }else{
+ JournalTopicAck ack=(JournalTopicAck)data;
+
result=((AMQTopicMessageStore)store).replayAcknowledge(context,ack.getClientId(),ack.getSubscritionName(),
+ ack.getMessageId());
+ }
+ return result;
+ }
+
+ public void writeExternal(WireFormat wireFormat,DataOutput dos) throws
IOException {
+ location.writeExternal(dos);
+ ByteSequence packet = wireFormat.marshal(getData());
+ dos.writeInt(packet.length);
+ dos.write(packet.data, packet.offset, packet.length);
+ packet = wireFormat.marshal(destination);
+ dos.writeInt(packet.length);
+ dos.write(packet.data, packet.offset, packet.length);
+ }
+
+ public void readExternal(WireFormat wireFormat,DataInput dis) throws
IOException {
+ this.location=new Location();
+ this.location.readExternal(dis);
+ int size=dis.readInt();
+ byte[] data=new byte[size];
+ dis.readFully(data);
+ setData(wireFormat.unmarshal(new ByteSequence(data)));
+ size=dis.readInt();
+ data=new byte[size];
+ dis.readFully(data);
+ this.destination=(ActiveMQDestination)wireFormat.unmarshal(new
ByteSequence(data));
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java?view=auto&rev=557389
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
Wed Jul 18 13:32:45 2007
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.store.amq.AMQTx;
+import org.apache.activemq.store.amq.AMQTxOperation;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Marshall an AMQTx
+ * @version $Revision: 1.10 $
+ */
+public class AMQTxMarshaller implements Marshaller<AMQTx>{
+
+ private WireFormat wireFormat;
+
+ public AMQTxMarshaller(WireFormat wireFormat){
+ this.wireFormat=wireFormat;
+ }
+
+ public AMQTx readPayload(DataInput dataIn) throws IOException{
+ Location location=new Location();
+ location.readExternal(dataIn);
+ AMQTx result=new AMQTx(location);
+ int size=dataIn.readInt();
+ for(int i=0;i<size;i++){
+ AMQTxOperation op=new AMQTxOperation();
+ op.readExternal(wireFormat,dataIn);
+ result.getOperations().add(op);
+ }
+ return result;
+ }
+
+ public void writePayload(AMQTx amqtx,DataOutput dataOut) throws
IOException{
+ amqtx.getLocation().writeExternal(dataOut);
+ List<AMQTxOperation> list=amqtx.getOperations();
+ dataOut.writeInt(list.size());
+ for(AMQTxOperation op:list){
+ op.writeExternal(wireFormat,dataOut);
+ }
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
------------------------------------------------------------------------------
svn:eol-style = native