Author: rajdavies Date: Wed Nov 29 14:09:33 2006 New Revision: 480731 URL: http://svn.apache.org/viewvc?view=rev&rev=480731 Log: Setting the Store based cursor as the default for Durable Subscribers
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java (with props) Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Nov 29 14:09:33 2006 @@ -47,6 +47,7 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; @@ -140,7 +141,8 @@ private ActiveMQDestination[] destinations; private Store tempDataStore; private int persistenceThreadPriority = Thread.MAX_PRIORITY; - private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy(); + //private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy(); + private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new StorePendingDurableSubscriberMessageStoragePolicy(); /** @@ -383,6 +385,7 @@ startDestinations(); addShutdownHook(); + log.info("Using Persistence Adaptor " + getPersistenceAdapter()); if (deleteAllMessagesOnStartup) { deleteAllMessages(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Nov 29 14:09:33 2006 @@ -64,6 +64,9 @@ if( active || keepDurableSubsActive ) { Topic topic = (Topic) destination; topic.activate(context, this); + if (pending.isEmpty(topic)) { + topic.recoverRetroactiveMessages(context, this); + } } dispatchMatched(); } @@ -81,6 +84,13 @@ } synchronized(pending) { pending.start(); + } + //If nothing was in the persistent store, then try to use the recovery policy. + if (pending.isEmpty()) { + for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { + Topic topic = (Topic) iter.next(); + topic.recoverRetroactiveMessages(context, this); + } } dispatchMatched(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Nov 29 14:09:33 2006 @@ -146,7 +146,6 @@ } public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { - // synchronize with dispatch method so that no new messages are sent // while // we are recovering a subscription to avoid out of order messages. @@ -210,15 +209,7 @@ }); } - if( true && subscription.getConsumerInfo().isRetroactive() ) { - // If nothing was in the persistent store, then try to use the recovery policy. - if( subscription.getEnqueueCounter() == 0 ) { - subscriptionRecoveryPolicy.recover(context, this, subscription); - } else { - // TODO: implement something like - // subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub); - } - } + } finally { @@ -231,7 +222,15 @@ consumers.remove(sub); } sub.remove(context, this); - } + } + + + protected void recoverRetroactiveMessages(ConnectionContext context,Subscription subscription) throws Exception{ + if(subscription.getConsumerInfo().isRetroactive()){ + subscriptionRecoveryPolicy.recover(context,this,subscription); + } + } + public void send(final ConnectionContext context, final Message message) throws Exception { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Wed Nov 29 14:09:33 2006 @@ -66,6 +66,10 @@ public boolean isEmpty(){ return false; } + + public boolean isEmpty(Destination destination) { + return isEmpty(); + } public MessageReference next(){ return null; Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Wed Nov 29 14:09:33 2006 @@ -50,6 +50,13 @@ public boolean isEmpty(); /** + * check if a Destination is Empty for this cursor + * @param destination + * @return true id the Destination is empty + */ + public boolean isEmpty(Destination destination); + + /** * reset the cursor * */ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Wed Nov 29 14:09:33 2006 @@ -19,6 +19,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.Map; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -85,14 +86,16 @@ * @throws Exception */ public synchronized void add(ConnectionContext context,Destination destination) throws Exception{ - TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName); - tsp.setMaxBatchSize(getMaxBatchSize()); - tsp.setUsageManager(usageManager); - topics.put(destination,tsp); - storePrefetches.add(tsp); - if(started){ - tsp.start(); - pendingCount+=tsp.size(); + if(destination!=null&&!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())){ + TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName); + tsp.setMaxBatchSize(getMaxBatchSize()); + tsp.setUsageManager(usageManager); + topics.put(destination,tsp); + storePrefetches.add(tsp); + if(started){ + tsp.start(); + pendingCount+=tsp.size(); + } } } @@ -115,6 +118,15 @@ */ public synchronized boolean isEmpty(){ return pendingCount<=0; + } + + public boolean isEmpty(Destination destination) { + boolean result = true; + TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(destination); + if(tsp!=null){ + result = tsp.size() <= 0; + } + return result; } /** Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Wed Nov 29 14:09:33 2006 @@ -88,4 +88,6 @@ public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned, JDBCMessageRecoveryListener listener) throws Exception; + + public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination destination,String clientId, String subscriberName) throws SQLException,IOException; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Wed Nov 29 14:09:33 2006 @@ -99,7 +99,8 @@ String subcriberId=getSubscriptionKey(clientId,subscriptionName); AtomicLong last=(AtomicLong)subscriberLastMessageMap.get(subcriberId); if(last==null){ - last=new AtomicLong(-1); + long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c,destination,clientId,subscriptionName); + last=new AtomicLong(lastAcked); subscriberLastMessageMap.put(subcriberId,last); } final AtomicLong finalLast=last; Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Wed Nov 29 14:09:33 2006 @@ -64,7 +64,7 @@ private String lockUpdateStatement; private String nextDurableSubscriberMessageStatement; private String durableSubscriberMessageCountStatement; - private String nextDurableSubscriberMessageIdStatement; + private String lastAckedDurableSubscriberMessageStatement; private String destinationMessageCountStatement; private String findNextMessagesStatement; private boolean useLockCreateWhereClause; @@ -322,6 +322,18 @@ } return findNextMessagesStatement; } + + /** + * @return the lastAckedDurableSubscriberMessageStatement + */ + public String getLastAckedDurableSubscriberMessageStatement(){ + if(lastAckedDurableSubscriberMessageStatement==null) { + lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName() + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; + } + return lastAckedDurableSubscriberMessageStatement; + } + public String getFullMessageTableName() { @@ -590,20 +602,7 @@ */ public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){ this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement; - } - - - - - - /** - * @param nextDurableSubscriberMessageIdStatement the nextDurableSubscriberMessageIdStatement to set - */ - public void setNextDurableSubscriberMessageIdStatement(String nextDurableSubscriberMessageIdStatement){ - this.nextDurableSubscriberMessageIdStatement=nextDurableSubscriberMessageIdStatement; - } - - + } /** * @param findNextMessagesStatement the findNextMessagesStatement to set @@ -617,6 +616,16 @@ */ public void setDestinationMessageCountStatement(String destinationMessageCountStatement){ this.destinationMessageCountStatement=destinationMessageCountStatement; + } + + + + + /** + * @param lastAckedDurableSubscriberMessageStatement the lastAckedDurableSubscriberMessageStatement to set + */ + public void setLastAckedDurableSubscriberMessageStatement(String lastAckedDurableSubscriberMessageStatement){ + this.lastAckedDurableSubscriberMessageStatement=lastAckedDurableSubscriberMessageStatement; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Nov 29 14:09:33 2006 @@ -544,6 +544,28 @@ close(s); } } + + public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination destination,String clientId, String subscriberName) throws SQLException,IOException{ + PreparedStatement s=null; + ResultSet rs=null; + long result = -1; + try{ + s=c.getConnection().prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement()); + s.setString(1,destination.getQualifiedName()); + s.setString(2,clientId); + s.setString(3,subscriberName); + rs=s.executeQuery(); + if(rs.next()){ + result=rs.getLong(1); + } + rs.close(); + s.close(); + }finally{ + close(rs); + close(s); + } + return result; + } static private void close(PreparedStatement s){ try{ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Wed Nov 29 14:09:33 2006 @@ -112,6 +112,7 @@ if(!subscriberContainer.containsKey(key)){ subscriberContainer.put(key,info); } + //add the subscriber ListContainer container=addSubscriberMessageContainer(key); if(retroactive){ for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ @@ -124,24 +125,9 @@ } } - public synchronized void deleteSubscription(String clientId,String subscriptionName){ + public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{ String key=getSubscriptionKey(clientId,subscriptionName); - subscriberContainer.remove(key); - TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); - for(Iterator i=container.getListContainer().iterator();i.hasNext();){ - ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); - if(ref!=null){ - TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); - if(tsa!=null){ - if(tsa.decrementCount()<=0){ - ackContainer.remove(ref.getAckEntry()); - messageContainer.remove(tsa.getMessageEntry()); - }else{ - ackContainer.update(ref.getAckEntry(),tsa); - } - } - } - } + removeSubscriberMessageContainer(key); } public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) @@ -223,6 +209,26 @@ TopicSubContainer tsc=new TopicSubContainer(container); subscriberMessages.put(key,tsc); return container; + } + + protected void removeSubscriberMessageContainer(Object key) throws IOException { + subscriberContainer.remove(key); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key); + for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); + if(ref!=null){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); + if(tsa!=null){ + if(tsa.decrementCount()<=0){ + ackContainer.remove(ref.getAckEntry()); + messageContainer.remove(tsa.getMessageEntry()); + }else{ + ackContainer.update(ref.getAckEntry(),tsa); + } + } + } + } + store.deleteListContainer(key,"topic-subs"); } public int getMessageCount(String clientId,String subscriberName) throws IOException{ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=480731&r1=480730&r2=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Wed Nov 29 14:09:33 2006 @@ -132,24 +132,9 @@ } } - public synchronized void deleteSubscription(String clientId,String subscriptionName){ + public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{ String key=getSubscriptionKey(clientId,subscriptionName); - subscriberContainer.remove(key); - TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); - for(Iterator i=container.getListContainer().iterator();i.hasNext();){ - ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); - if(ref!=null){ - TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); - if(tsa!=null){ - if(tsa.decrementCount()<=0){ - ackContainer.remove(ref.getAckEntry()); - messageContainer.remove(tsa.getMessageEntry()); - }else{ - ackContainer.update(ref.getAckEntry(),tsa); - } - } - } - } + removeSubscriberMessageContainer(key); } public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) @@ -220,6 +205,26 @@ TopicSubContainer tsc=new TopicSubContainer(container); subscriberMessages.put(key,tsc); return container; + } + + protected void removeSubscriberMessageContainer(Object key) throws IOException { + subscriberContainer.remove(key); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key); + for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); + if(ref!=null){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); + if(tsa!=null){ + if(tsa.decrementCount()<=0){ + ackContainer.remove(ref.getAckEntry()); + messageContainer.remove(tsa.getMessageEntry()); + }else{ + ackContainer.update(ref.getAckEntry(),tsa); + } + } + } + } + store.deleteListContainer(key,"topic-subs"); } public int getMessageCount(String clientId,String subscriberName) throws IOException{ Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java?view=auto&rev=480731 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java Wed Nov 29 14:09:33 2006 @@ -0,0 +1,32 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq.usecases; + +import java.io.File; +import java.io.IOException; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; + +/** + * @version $Revision: 1.1.1.1 $ + */ +public class KahaDurableSubscriptionTest extends DurableSubscriptionTestSupport{ + + protected PersistenceAdapter createPersistenceAdapter() throws IOException{ + File dataDir=new File("target/test-data/durableKaha"); + KahaPersistenceAdapter adaptor=new KahaPersistenceAdapter(dataDir); + return adaptor; + } +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDurableSubscriptionTest.java ------------------------------------------------------------------------------ svn:executable = *