Author: chirino
Date: Fri Nov 17 12:53:42 2006
New Revision: 476310

URL: http://svn.apache.org/viewvc?view=rev&rev=476310
Log:
switch from using the RecordLocation interface to the Location interface since 
the adapter will need to 
be aware of what log file the active records are in so that it can delete 
un-used log files.

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
 Fri Nov 17 12:53:42 2006
@@ -18,30 +18,24 @@
 
 package org.apache.activemq.store.rapid;
 
-import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.journal.active.Location;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 
 public class RapidMessageReference {
     public final MessageId messageId;
-    public final long expiration;
-    public final RecordLocation location;
+    public final Location location;
     
-    public RapidMessageReference(Message message, RecordLocation location) {
+    public RapidMessageReference(Message message, Location location) {
         this.messageId = message.getMessageId();
-        this.expiration = message.getExpiration();
         this.location=location;
     }
 
-    public long getExpiration() {
-        return expiration;
-    }
-
     public MessageId getMessageId() {
         return messageId;
     }
     
-    public RecordLocation getLocation() {
+    public Location getLocation() {
         return location;
     }
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
 Fri Nov 17 12:53:42 2006
@@ -23,7 +23,6 @@
 import java.util.HashSet;
 import java.util.Iterator;
 
-import org.apache.activeio.journal.RecordLocation;
 import org.apache.activeio.journal.active.Location;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -61,7 +60,7 @@
 //    /** A MessageStore that we can use to retrieve messages quickly. */
 //    private LinkedHashMap cpAddedMessageIds;
     
-    protected RecordLocation lastLocation;
+    protected Location lastLocation;
     protected HashSet inFlightTxLocations = new HashSet();
     
     public RapidMessageStore(RapidPersistenceAdapter adapter, 
ActiveMQDestination destination, MapContainer container) {
@@ -82,7 +81,7 @@
         final MessageId id = message.getMessageId();
         
         final boolean debug = log.isDebugEnabled();        
-        final RecordLocation location = 
peristenceAdapter.writeCommand(message, message.isResponseRequired());
+        final Location location = peristenceAdapter.writeCommand(message, 
message.isResponseRequired());
         final RapidMessageReference md = new RapidMessageReference(message, 
location);
         
         if( !context.isInTransaction() ) {
@@ -127,19 +126,19 @@
         }
     }
     
-    static protected String toString(RecordLocation location) {
+    static protected String toString(Location location) {
         Location l = (Location) location;
         return l.getLogFileId()+":"+l.getLogFileOffset();
     }
 
-    static protected RecordLocation toRecordLocation(String t) {
+    static protected Location toLocation(String t) {
         String[] strings = t.split(":");
         if( strings.length!=2 )
             throw new IllegalArgumentException("Invalid location: "+t);
         return new 
Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1]));
     }
 
-    public void replayAddMessage(ConnectionContext context, Message message, 
RecordLocation location) {
+    public void replayAddMessage(ConnectionContext context, Message message, 
Location location) {
         try {
             RapidMessageReference messageReference = new 
RapidMessageReference(message, location);
             messageContainer.put(message.getMessageId().toString(), 
messageReference);
@@ -157,7 +156,7 @@
         remove.setDestination(destination);
         remove.setMessageAck(ack);
         
-        final RecordLocation location = peristenceAdapter.writeCommand(remove, 
ack.isResponseRequired());
+        final Location location = peristenceAdapter.writeCommand(remove, 
ack.isResponseRequired());
         if( !context.isInTransaction() ) {
             if( debug )
                 log.debug("Journalled message remove for: 
"+ack.getLastMessageId()+", at: "+location);
@@ -190,7 +189,7 @@
         }
     }
     
-    private void removeMessage(final MessageAck ack, final RecordLocation 
location) {
+    private void removeMessage(final MessageAck ack, final Location location) {
         synchronized (this) {
             lastLocation = location;
             MessageId id = ack.getLastMessageId();
@@ -270,7 +269,7 @@
      * @return
      * @throws IOException
      */
-    public RecordLocation checkpoint() throws IOException {
+    public Location checkpoint() throws IOException {
 
         ArrayList cpActiveJournalLocations;
 
@@ -281,7 +280,7 @@
         
         if( cpActiveJournalLocations.size() > 0 ) {
             Collections.sort(cpActiveJournalLocations);
-            return (RecordLocation) cpActiveJournalLocations.get(0);
+            return (Location) cpActiveJournalLocations.get(0);
         } else {
             return lastLocation;
         }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
 Fri Nov 17 12:53:42 2006
@@ -23,12 +23,22 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activeio.journal.InvalidRecordLocationException;
 import org.apache.activeio.journal.Journal;
 import org.apache.activeio.journal.JournalEventListener;
 import org.apache.activeio.journal.RecordLocation;
 import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activeio.journal.active.Location;
 import org.apache.activeio.packet.ByteArrayPacket;
 import org.apache.activeio.packet.Packet;
 import org.apache.activemq.broker.ConnectionContext;
@@ -67,16 +77,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * An implementation of [EMAIL PROTECTED] PersistenceAdapter} designed for use 
with a
  * [EMAIL PROTECTED] Journal} and then check pointing asynchronously on a 
timeout with some
@@ -471,14 +471,14 @@
      */
     private void recover() throws IllegalStateException, 
InvalidRecordLocationException, IOException, IOException {
 
-        RecordLocation pos = null;
+        Location pos = null;
         int transactionCounter = 0;
 
         log.info("Journal Recovery Started.");
         ConnectionContext context = new ConnectionContext();
 
         // While we have records in the journal.
-        while ((pos = journal.getNextRecordLocation(pos)) != null) {
+        while ((pos = (Location) journal.getNextRecordLocation(pos)) != null) {
             Packet data = journal.read(pos);
             DataStructure c = (DataStructure) 
wireFormat.unmarshal(toByteSequence(data));
 
@@ -603,9 +603,9 @@
      * @return
      * @throws IOException
      */
-    public RecordLocation writeCommand(DataStructure command, boolean sync) 
throws IOException {
+    public Location writeCommand(DataStructure command, boolean sync) throws 
IOException {
         if( started.get() )
-            return journal.write(toPacket(wireFormat.marshal(command)), sync);
+            return (Location) 
journal.write(toPacket(wireFormat.marshal(command)), sync);
         throw new IOException("closed");
     }
 

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
 Fri Nov 17 12:53:42 2006
@@ -22,7 +22,8 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activeio.journal.RecordLocation;
+
+import org.apache.activeio.journal.active.Location;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.JournalTopicAck;
@@ -170,7 +171,7 @@
         ack.setSubscritionName(subscriptionName);
         ack.setClientId(clientId);
         
ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
-        final RecordLocation 
location=peristenceAdapter.writeCommand(ack,false);
+        final Location location=peristenceAdapter.writeCommand(ack,false);
         final SubscriptionKey key=new 
SubscriptionKey(clientId,subscriptionName);
         if(!context.isInTransaction()){
             if(debug)
@@ -236,7 +237,7 @@
      * @param location
      * @param key
      */
-    private void acknowledge(MessageId messageId,RecordLocation 
location,SubscriptionKey key){
+    private void acknowledge(MessageId messageId,Location 
location,SubscriptionKey key){
         synchronized(this){
             lastLocation=location;
             ackedLastAckLocations.put(key,messageId);
@@ -265,17 +266,17 @@
         return result;
     }
 
-    public RecordLocation checkpoint() throws IOException{
+    public Location checkpoint() throws IOException{
         ArrayList cpAckedLastAckLocations;
         // swap out the hash maps..
         synchronized(this){
             cpAckedLastAckLocations=new 
ArrayList(this.ackedLastAckLocations.values());
             this.ackedLastAckLocations=new HashMap();
         }
-        RecordLocation rc=super.checkpoint();
+        Location rc=super.checkpoint();
         if(!cpAckedLastAckLocations.isEmpty()){
             Collections.sort(cpAckedLastAckLocations);
-            RecordLocation t=(RecordLocation)cpAckedLastAckLocations.get(0);
+            Location t=(Location)cpAckedLastAckLocations.get(0);
             if(rc==null||t.compareTo(rc)<0){
                 rc=t;
             }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java?view=diff&rev=476310&r1=476309&r2=476310
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java
 Fri Nov 17 12:53:42 2006
@@ -20,10 +20,11 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.transaction.xa.XAException;
 
-import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.journal.active.Location;
 import org.apache.activemq.command.JournalTopicAck;
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
@@ -33,8 +34,6 @@
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  */
 public class RapidTransactionStore implements TransactionStore {
@@ -54,9 +53,9 @@
         public byte operationType;
         public RapidMessageStore store;
         public Object data;
-        public RecordLocation location;
+        public Location location;
         
-        public TxOperation(byte operationType, RapidMessageStore store, Object 
data, RecordLocation location) {
+        public TxOperation(byte operationType, RapidMessageStore store, Object 
data, Location location) {
             this.operationType=operationType;
             this.store=store;
             this.data=data;
@@ -70,22 +69,22 @@
      */
     public static class Tx {
 
-        private final RecordLocation location;
+        private final Location location;
         private ArrayList operations = new ArrayList();
 
-        public Tx(RecordLocation location) {
+        public Tx(Location location) {
             this.location=location;
         }
 
-        public void add(RapidMessageStore store, Message msg, RecordLocation 
loc) {
+        public void add(RapidMessageStore store, Message msg, Location loc) {
             operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, 
store, msg, loc));
         }
 
-        public void add(RapidMessageStore store, MessageAck ack, 
RecordLocation loc) {
+        public void add(RapidMessageStore store, MessageAck ack, Location loc) 
{
             operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, 
store, ack, loc));
         }
 
-        public void add(RapidTopicMessageStore store, JournalTopicAck ack, 
RecordLocation loc) {
+        public void add(RapidTopicMessageStore store, JournalTopicAck ack, 
Location loc) {
             operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, 
store, ack, loc));
         }
         
@@ -148,7 +147,7 @@
         preparedTransactions.put(txid, tx);
     }
 
-    public Tx getTx(Object txid, RecordLocation location) {
+    public Tx getTx(Object txid, Location location) {
         Tx tx = (Tx) inflightTransactions.get(txid);
         if (tx == null) {
             tx = new Tx(location);
@@ -249,7 +248,7 @@
      * @param message
      * @throws IOException
      */
-    void addMessage(RapidMessageStore store, Message message, RecordLocation 
location) throws IOException {
+    void addMessage(RapidMessageStore store, Message message, Location 
location) throws IOException {
         Tx tx = getTx(message.getTransactionId(), location);
         tx.add(store, message, location);
     }
@@ -258,19 +257,19 @@
      * @param ack
      * @throws IOException
      */
-    public void removeMessage(RapidMessageStore store, MessageAck ack, 
RecordLocation location) throws IOException {
+    public void removeMessage(RapidMessageStore store, MessageAck ack, 
Location location) throws IOException {
         Tx tx = getTx(ack.getTransactionId(), location);
         tx.add(store, ack, location);
     }
     
     
-    public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, 
RecordLocation location) {
+    public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, 
Location location) {
         Tx tx = getTx(ack.getTransactionId(), location);
         tx.add(store, ack, location);
     }
 
 
-    public RecordLocation checkpoint() throws IOException {
+    public Location checkpoint() throws IOException {
         
         // Nothing really to checkpoint.. since, we don't
         // checkpoint tx operations in to long term store until they are 
committed.
@@ -278,17 +277,17 @@
         // But we keep track of the first location of an operation
         // that was associated with an active tx. The journal can not
         // roll over active tx records.        
-        RecordLocation rc = null;
+        Location rc = null;
         for (Iterator iter = inflightTransactions.values().iterator(); 
iter.hasNext();) {
             Tx tx = (Tx) iter.next();
-            RecordLocation location = tx.location;
+            Location location = tx.location;
             if (rc == null || rc.compareTo(location) < 0) {
                 rc = location;
             }
         }
         for (Iterator iter = preparedTransactions.values().iterator(); 
iter.hasNext();) {
             Tx tx = (Tx) iter.next();
-            RecordLocation location = tx.location;
+            Location location = tx.location;
             if (rc == null || rc.compareTo(location) < 0) {
                 rc = location;
             }


Reply via email to