Author: chirino Date: Fri Nov 17 12:53:42 2006 New Revision: 476310 URL: http://svn.apache.org/viewvc?view=rev&rev=476310 Log: switch from using the RecordLocation interface to the Location interface since the adapter will need to be aware of what log file the active records are in so that it can delete un-used log files.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java?view=diff&rev=476310&r1=476309&r2=476310 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java Fri Nov 17 12:53:42 2006 @@ -18,30 +18,24 @@ package org.apache.activemq.store.rapid; -import org.apache.activeio.journal.RecordLocation; +import org.apache.activeio.journal.active.Location; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; public class RapidMessageReference { public final MessageId messageId; - public final long expiration; - public final RecordLocation location; + public final Location location; - public RapidMessageReference(Message message, RecordLocation location) { + public RapidMessageReference(Message message, Location location) { this.messageId = message.getMessageId(); - this.expiration = message.getExpiration(); this.location=location; } - public long getExpiration() { - return expiration; - } - public MessageId getMessageId() { return messageId; } - public RecordLocation getLocation() { + public Location getLocation() { return location; } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?view=diff&rev=476310&r1=476309&r2=476310 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Fri Nov 17 12:53:42 2006 @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Iterator; -import org.apache.activeio.journal.RecordLocation; import org.apache.activeio.journal.active.Location; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -61,7 +60,7 @@ // /** A MessageStore that we can use to retrieve messages quickly. */ // private LinkedHashMap cpAddedMessageIds; - protected RecordLocation lastLocation; + protected Location lastLocation; protected HashSet inFlightTxLocations = new HashSet(); public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, MapContainer container) { @@ -82,7 +81,7 @@ final MessageId id = message.getMessageId(); final boolean debug = log.isDebugEnabled(); - final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); + final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); final RapidMessageReference md = new RapidMessageReference(message, location); if( !context.isInTransaction() ) { @@ -127,19 +126,19 @@ } } - static protected String toString(RecordLocation location) { + static protected String toString(Location location) { Location l = (Location) location; return l.getLogFileId()+":"+l.getLogFileOffset(); } - static protected RecordLocation toRecordLocation(String t) { + static protected Location toLocation(String t) { String[] strings = t.split(":"); if( strings.length!=2 ) throw new IllegalArgumentException("Invalid location: "+t); return new Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1])); } - public void replayAddMessage(ConnectionContext context, Message message, RecordLocation location) { + public void replayAddMessage(ConnectionContext context, Message message, Location location) { try { RapidMessageReference messageReference = new RapidMessageReference(message, location); messageContainer.put(message.getMessageId().toString(), messageReference); @@ -157,7 +156,7 @@ remove.setDestination(destination); remove.setMessageAck(ack); - final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); + final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); if( !context.isInTransaction() ) { if( debug ) log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); @@ -190,7 +189,7 @@ } } - private void removeMessage(final MessageAck ack, final RecordLocation location) { + private void removeMessage(final MessageAck ack, final Location location) { synchronized (this) { lastLocation = location; MessageId id = ack.getLastMessageId(); @@ -270,7 +269,7 @@ * @return * @throws IOException */ - public RecordLocation checkpoint() throws IOException { + public Location checkpoint() throws IOException { ArrayList cpActiveJournalLocations; @@ -281,7 +280,7 @@ if( cpActiveJournalLocations.size() > 0 ) { Collections.sort(cpActiveJournalLocations); - return (RecordLocation) cpActiveJournalLocations.get(0); + return (Location) cpActiveJournalLocations.get(0); } else { return lastLocation; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?view=diff&rev=476310&r1=476309&r2=476310 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java Fri Nov 17 12:53:42 2006 @@ -23,12 +23,22 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activeio.journal.InvalidRecordLocationException; import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.JournalEventListener; import org.apache.activeio.journal.RecordLocation; import org.apache.activeio.journal.active.JournalImpl; +import org.apache.activeio.journal.active.Location; import org.apache.activeio.packet.ByteArrayPacket; import org.apache.activeio.packet.Packet; import org.apache.activemq.broker.ConnectionContext; @@ -67,16 +77,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - /** * An implementation of [EMAIL PROTECTED] PersistenceAdapter} designed for use with a * [EMAIL PROTECTED] Journal} and then check pointing asynchronously on a timeout with some @@ -471,14 +471,14 @@ */ private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException { - RecordLocation pos = null; + Location pos = null; int transactionCounter = 0; log.info("Journal Recovery Started."); ConnectionContext context = new ConnectionContext(); // While we have records in the journal. - while ((pos = journal.getNextRecordLocation(pos)) != null) { + while ((pos = (Location) journal.getNextRecordLocation(pos)) != null) { Packet data = journal.read(pos); DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data)); @@ -603,9 +603,9 @@ * @return * @throws IOException */ - public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { + public Location writeCommand(DataStructure command, boolean sync) throws IOException { if( started.get() ) - return journal.write(toPacket(wireFormat.marshal(command)), sync); + return (Location) journal.write(toPacket(wireFormat.marshal(command)), sync); throw new IOException("closed"); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=476310&r1=476309&r2=476310 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri Nov 17 12:53:42 2006 @@ -22,7 +22,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activeio.journal.RecordLocation; + +import org.apache.activeio.journal.active.Location; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.JournalTopicAck; @@ -170,7 +171,7 @@ ack.setSubscritionName(subscriptionName); ack.setClientId(clientId); ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null); - final RecordLocation location=peristenceAdapter.writeCommand(ack,false); + final Location location=peristenceAdapter.writeCommand(ack,false); final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); if(!context.isInTransaction()){ if(debug) @@ -236,7 +237,7 @@ * @param location * @param key */ - private void acknowledge(MessageId messageId,RecordLocation location,SubscriptionKey key){ + private void acknowledge(MessageId messageId,Location location,SubscriptionKey key){ synchronized(this){ lastLocation=location; ackedLastAckLocations.put(key,messageId); @@ -265,17 +266,17 @@ return result; } - public RecordLocation checkpoint() throws IOException{ + public Location checkpoint() throws IOException{ ArrayList cpAckedLastAckLocations; // swap out the hash maps.. synchronized(this){ cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values()); this.ackedLastAckLocations=new HashMap(); } - RecordLocation rc=super.checkpoint(); + Location rc=super.checkpoint(); if(!cpAckedLastAckLocations.isEmpty()){ Collections.sort(cpAckedLastAckLocations); - RecordLocation t=(RecordLocation)cpAckedLastAckLocations.get(0); + Location t=(Location)cpAckedLastAckLocations.get(0); if(rc==null||t.compareTo(rc)<0){ rc=t; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java?view=diff&rev=476310&r1=476309&r2=476310 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java Fri Nov 17 12:53:42 2006 @@ -20,10 +20,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import javax.transaction.xa.XAException; -import org.apache.activeio.journal.RecordLocation; +import org.apache.activeio.journal.active.Location; import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; @@ -33,8 +34,6 @@ import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionStore; -import java.util.concurrent.ConcurrentHashMap; - /** */ public class RapidTransactionStore implements TransactionStore { @@ -54,9 +53,9 @@ public byte operationType; public RapidMessageStore store; public Object data; - public RecordLocation location; + public Location location; - public TxOperation(byte operationType, RapidMessageStore store, Object data, RecordLocation location) { + public TxOperation(byte operationType, RapidMessageStore store, Object data, Location location) { this.operationType=operationType; this.store=store; this.data=data; @@ -70,22 +69,22 @@ */ public static class Tx { - private final RecordLocation location; + private final Location location; private ArrayList operations = new ArrayList(); - public Tx(RecordLocation location) { + public Tx(Location location) { this.location=location; } - public void add(RapidMessageStore store, Message msg, RecordLocation loc) { + public void add(RapidMessageStore store, Message msg, Location loc) { operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, loc)); } - public void add(RapidMessageStore store, MessageAck ack, RecordLocation loc) { + public void add(RapidMessageStore store, MessageAck ack, Location loc) { operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, loc)); } - public void add(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation loc) { + public void add(RapidTopicMessageStore store, JournalTopicAck ack, Location loc) { operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, loc)); } @@ -148,7 +147,7 @@ preparedTransactions.put(txid, tx); } - public Tx getTx(Object txid, RecordLocation location) { + public Tx getTx(Object txid, Location location) { Tx tx = (Tx) inflightTransactions.get(txid); if (tx == null) { tx = new Tx(location); @@ -249,7 +248,7 @@ * @param message * @throws IOException */ - void addMessage(RapidMessageStore store, Message message, RecordLocation location) throws IOException { + void addMessage(RapidMessageStore store, Message message, Location location) throws IOException { Tx tx = getTx(message.getTransactionId(), location); tx.add(store, message, location); } @@ -258,19 +257,19 @@ * @param ack * @throws IOException */ - public void removeMessage(RapidMessageStore store, MessageAck ack, RecordLocation location) throws IOException { + public void removeMessage(RapidMessageStore store, MessageAck ack, Location location) throws IOException { Tx tx = getTx(ack.getTransactionId(), location); tx.add(store, ack, location); } - public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation location) { + public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, Location location) { Tx tx = getTx(ack.getTransactionId(), location); tx.add(store, ack, location); } - public RecordLocation checkpoint() throws IOException { + public Location checkpoint() throws IOException { // Nothing really to checkpoint.. since, we don't // checkpoint tx operations in to long term store until they are committed. @@ -278,17 +277,17 @@ // But we keep track of the first location of an operation // that was associated with an active tx. The journal can not // roll over active tx records. - RecordLocation rc = null; + Location rc = null; for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) { Tx tx = (Tx) iter.next(); - RecordLocation location = tx.location; + Location location = tx.location; if (rc == null || rc.compareTo(location) < 0) { rc = location; } } for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) { Tx tx = (Tx) iter.next(); - RecordLocation location = tx.location; + Location location = tx.location; if (rc == null || rc.compareTo(location) < 0) { rc = location; }