Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java?view=auto&rev=492380 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java Wed Jan 3 17:48:20 2007 @@ -0,0 +1,205 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.quick; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.JournalTopicAck; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.kaha.impl.async.Location; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TopicReferenceStore; +import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.util.Callback; +import org.apache.activemq.util.SubscriptionKey; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A MessageStore that uses a Journal to store it's messages. + * + * @version $Revision: 1.13 $ + */ +public class QuickTopicMessageStore extends QuickMessageStore implements TopicMessageStore { + + private static final Log log = LogFactory.getLog(QuickTopicMessageStore.class); + + private TopicReferenceStore topicReferenceStore; + private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); + + public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore checkpointStore, ActiveMQTopic destinationName) { + super(adapter, checkpointStore, destinationName); + this.topicReferenceStore = checkpointStore; + } + + public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { + this.peristenceAdapter.checkpoint(true); + topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener)); + } + + public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, final MessageRecoveryListener listener) throws Exception{ + this.peristenceAdapter.checkpoint(true); + topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, new RecoveryListenerAdapter(this, listener)); + } + + public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { + return topicReferenceStore.lookupSubscription(clientId, subscriptionName); + } + + public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { + this.peristenceAdapter.checkpoint(true); + topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive); + } + + public void addMessage(ConnectionContext context, Message message) throws IOException { + super.addMessage(context, message); + } + + /** + */ + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException { + final boolean debug = log.isDebugEnabled(); + + JournalTopicAck ack = new JournalTopicAck(); + ack.setDestination(destination); + ack.setMessageId(messageId); + ack.setMessageSequenceId(messageId.getBrokerSequenceId()); + ack.setSubscritionName(subscriptionName); + ack.setClientId(clientId); + ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null); + final Location location = peristenceAdapter.writeCommand(ack, false); + + final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); + if( !context.isInTransaction() ) { + if( debug ) + log.debug("Journalled acknowledge for: "+messageId+", at: "+location); + acknowledge(messageId, location, key); + } else { + if( debug ) + log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location); + synchronized (this) { + inFlightTxLocations.add(location); + } + transactionStore.acknowledge(this, ack, location); + context.getTransaction().addSynchronization(new Synchronization(){ + public void afterCommit() throws Exception { + if( debug ) + log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location); + synchronized (QuickTopicMessageStore.this) { + inFlightTxLocations.remove(location); + acknowledge(messageId, location, key); + } + } + public void afterRollback() throws Exception { + if( debug ) + log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location); + synchronized (QuickTopicMessageStore.this) { + inFlightTxLocations.remove(location); + } + } + }); + } + + } + + public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) { + try { + SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName); + if( sub != null ) { + topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId); + } + } + catch (Throwable e) { + log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e); + } + } + + + /** + * @param messageId + * @param location + * @param key + */ + private void acknowledge(MessageId messageId, Location location, SubscriptionKey key) { + synchronized(this) { + lastLocation = location; + ackedLastAckLocations.put(key, messageId); + } + } + + public Location checkpoint() throws IOException { + + final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations; + + // swap out the hash maps.. + synchronized (this) { + cpAckedLastAckLocations = this.ackedLastAckLocations; + this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); + } + + return super.checkpoint( new Callback() { + public void execute() throws Exception { + + // Checkpoint the acknowledged messages. + Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator(); + while (iterator.hasNext()) { + SubscriptionKey subscriptionKey = iterator.next(); + MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); + topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity); + } + + } + }); + + } + + /** + * @return Returns the longTermStore. + */ + public TopicReferenceStore getTopicReferenceStore() { + return topicReferenceStore; + } + + public void deleteSubscription(String clientId, String subscriptionName) throws IOException { + topicReferenceStore.deleteSubscription(clientId, subscriptionName); + } + + public SubscriptionInfo[] getAllSubscriptions() throws IOException { + return topicReferenceStore.getAllSubscriptions(); + } + + + public int getMessageCount(String clientId,String subscriberName) throws IOException{ + this.peristenceAdapter.checkpoint(true); + return topicReferenceStore.getMessageCount(clientId,subscriberName); + } + + public void resetBatching(String clientId,String subscriptionName) { + topicReferenceStore.resetBatching(clientId,subscriptionName); + } + + + +} \ No newline at end of file
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java?view=auto&rev=492380 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java Wed Jan 3 17:48:20 2007 @@ -0,0 +1,338 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.quick; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import javax.transaction.xa.XAException; + +import org.apache.activemq.command.JournalTopicAck; +import org.apache.activemq.command.JournalTransaction; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.kaha.impl.async.Location; +import org.apache.activemq.store.TransactionRecoveryListener; +import org.apache.activemq.store.TransactionStore; + + +/** + */ +public class QuickTransactionStore implements TransactionStore { + + private final QuickPersistenceAdapter peristenceAdapter; + Map<TransactionId, Tx> inflightTransactions = new LinkedHashMap<TransactionId, Tx>(); + Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>(); + private boolean doingRecover; + + + public static class TxOperation { + + static final byte ADD_OPERATION_TYPE = 0; + static final byte REMOVE_OPERATION_TYPE = 1; + static final byte ACK_OPERATION_TYPE = 3; + + public byte operationType; + public QuickMessageStore store; + public Object data; + + public TxOperation(byte operationType, QuickMessageStore store, Object data) { + this.operationType=operationType; + this.store=store; + this.data=data; + } + + } + /** + * Operations + * @version $Revision: 1.6 $ + */ + public static class Tx { + + private final Location location; + private ArrayList<TxOperation> operations = new ArrayList<TxOperation>(); + + public Tx(Location location) { + this.location=location; + } + + public void add(QuickMessageStore store, Message msg) { + operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg)); + } + + public void add(QuickMessageStore store, MessageAck ack) { + operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack)); + } + + public void add(QuickTopicMessageStore store, JournalTopicAck ack) { + operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack)); + } + + public Message[] getMessages() { + ArrayList<Object> list = new ArrayList<Object>(); + for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) { + TxOperation op = iter.next(); + if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) { + list.add(op.data); + } + } + Message rc[] = new Message[list.size()]; + list.toArray(rc); + return rc; + } + + public MessageAck[] getAcks() { + ArrayList<Object> list = new ArrayList<Object>(); + for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) { + TxOperation op = iter.next(); + if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) { + list.add(op.data); + } + } + MessageAck rc[] = new MessageAck[list.size()]; + list.toArray(rc); + return rc; + } + + public ArrayList<TxOperation> getOperations() { + return operations; + } + + } + + public QuickTransactionStore(QuickPersistenceAdapter adapter) { + this.peristenceAdapter = adapter; + } + + /** + * @throws IOException + * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) + */ + public void prepare(TransactionId txid) throws IOException{ + Tx tx=null; + synchronized(inflightTransactions){ + tx=inflightTransactions.remove(txid); + } + if(tx==null) + return; + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true); + synchronized(preparedTransactions){ + preparedTransactions.put(txid,tx); + } + } + + /** + * @throws IOException + * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) + */ + public void replayPrepare(TransactionId txid) throws IOException{ + Tx tx=null; + synchronized(inflightTransactions){ + tx=inflightTransactions.remove(txid); + } + if(tx==null) + return; + synchronized(preparedTransactions){ + preparedTransactions.put(txid,tx); + } + } + + public Tx getTx(TransactionId txid,Location location){ + Tx tx=null; + synchronized(inflightTransactions){ + tx=inflightTransactions.get(txid); + } + if(tx==null){ + tx=new Tx(location); + inflightTransactions.put(txid,tx); + } + return tx; + } + + /** + * @throws XAException + * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) + */ + public void commit(TransactionId txid,boolean wasPrepared) throws IOException{ + Tx tx; + if(wasPrepared){ + synchronized(preparedTransactions){ + tx=preparedTransactions.remove(txid); + } + }else{ + synchronized(inflightTransactions){ + tx=inflightTransactions.remove(txid); + } + } + if(tx==null) + return; + if(txid.isXATransaction()){ + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true); + }else{ + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared), + true); + } + } + + /** + * @throws XAException + * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) + */ + public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{ + if(wasPrepared){ + synchronized(preparedTransactions){ + return preparedTransactions.remove(txid); + } + }else{ + synchronized(inflightTransactions){ + return inflightTransactions.remove(txid); + } + } + } + + /** + * @throws IOException + * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) + */ + public void rollback(TransactionId txid) throws IOException{ + Tx tx=null; + synchronized(inflightTransactions){ + tx=inflightTransactions.remove(txid); + } + if(tx!=null) + synchronized(preparedTransactions){ + tx=preparedTransactions.remove(txid); + } + if(tx!=null){ + if(txid.isXATransaction()){ + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true); + }else{ + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false), + true); + } + } + } + + /** + * @throws IOException + * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) + */ + public void replayRollback(TransactionId txid) throws IOException{ + boolean inflight=false; + synchronized(inflightTransactions){ + inflight=inflightTransactions.remove(txid)!=null; + } + if(inflight){ + synchronized(preparedTransactions){ + preparedTransactions.remove(txid); + } + } + } + + public void start() throws Exception { + } + + public void stop() throws Exception { + } + + synchronized public void recover(TransactionRecoveryListener listener) throws IOException{ + // All the in-flight transactions get rolled back.. + synchronized(inflightTransactions){ + inflightTransactions.clear(); + } + this.doingRecover=true; + try{ + Map<TransactionId, Tx> txs=null; + synchronized(preparedTransactions){ + txs=new LinkedHashMap<TransactionId, Tx>(preparedTransactions); + } + for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){ + Object txid=iter.next(); + Tx tx=txs.get(txid); + listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks()); + } + }finally{ + this.doingRecover=false; + } + } + + /** + * @param message + * @throws IOException + */ + void addMessage(QuickMessageStore store, Message message, Location location) throws IOException { + Tx tx = getTx(message.getTransactionId(), location); + tx.add(store, message); + } + + /** + * @param ack + * @throws IOException + */ + public void removeMessage(QuickMessageStore store, MessageAck ack, Location location) throws IOException { + Tx tx = getTx(ack.getTransactionId(), location); + tx.add(store, ack); + } + + + public void acknowledge(QuickTopicMessageStore store, JournalTopicAck ack, Location location) { + Tx tx = getTx(ack.getTransactionId(), location); + tx.add(store, ack); + } + + + public Location checkpoint() throws IOException{ + // Nothing really to checkpoint.. since, we don't + // checkpoint tx operations in to long term store until they are committed. + // But we keep track of the first location of an operation + // that was associated with an active tx. The journal can not + // roll over active tx records. + Location rc=null; + synchronized(inflightTransactions){ + for(Iterator<Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){ + Tx tx=iter.next(); + Location location=tx.location; + if(rc==null||rc.compareTo(location)<0){ + rc=location; + } + } + } + synchronized(preparedTransactions){ + for(Iterator<Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){ + Tx tx=iter.next(); + Location location=tx.location; + if(rc==null||rc.compareTo(location)<0){ + rc=location; + } + } + return rc; + } + } + + public boolean isDoingRecover() { + return doingRecover; + } + + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java?view=auto&rev=492380 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java Wed Jan 3 17:48:20 2007 @@ -0,0 +1,50 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.quick; + +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; + +final class RecoveryListenerAdapter implements MessageRecoveryListener { + + private final MessageStore store; + private final MessageRecoveryListener listener; + + RecoveryListenerAdapter(MessageStore store, MessageRecoveryListener listener) { + this.store = store; + this.listener = listener; + } + + public void finished() { + listener.finished(); + } + + public boolean hasSpace() { + return listener.hasSpace(); + } + + public void recoverMessage(Message message) throws Exception { + listener.recoverMessage(message); + } + + public void recoverMessageReference(MessageId ref) throws Exception { + listener.recoverMessage( this.store.getMessage(ref) ); + } +} \ No newline at end of file Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html?view=auto&rev=492380 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html Wed Jan 3 17:48:20 2007 @@ -0,0 +1,27 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<html> +<head> +</head> +<body> + +<p> + +</p> + +</body> +</html> Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java?view=diff&rev=492380&r1=492379&r2=492380 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java Wed Jan 3 17:48:20 2007 @@ -54,14 +54,14 @@ } - @Override - public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception { - // TODO: this test is currently failing in base class.. overriden to avoid failure - } - - @Override - public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception { - // TODO: this test is currently failing in base class.. overriden to avoid failure - } +// @Override +// public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception { +// // TODO: this test is currently failing in base class.. overriden to avoid failure +// } +// +// @Override +// public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception { +// // TODO: this test is currently failing in base class.. overriden to avoid failure +// } } Modified: incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties?view=diff&rev=492380&r1=492379&r2=492380 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties (original) +++ incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties Wed Jan 3 17:48:20 2007 @@ -18,7 +18,7 @@ # # The logging properties used during tests.. # -log4j.rootLogger=INFO, out +log4j.rootLogger=DEBUG, stdout log4j.logger.org.apache.activemq.spring=WARN