Author: chirino Date: Mon Nov 20 10:13:50 2006 New Revision: 477273 URL: http://svn.apache.org/viewvc?view=rev&rev=477273 Log: Added the initial cut of a JPA based message store.
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java (with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java Modified: incubator/activemq/trunk/activemq-core/pom.xml incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java incubator/activemq/trunk/pom.xml Modified: incubator/activemq/trunk/activemq-core/pom.xml URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?view=diff&rev=477273&r1=477272&r2=477273 ============================================================================== --- incubator/activemq/trunk/activemq-core/pom.xml (original) +++ incubator/activemq/trunk/activemq-core/pom.xml Mon Nov 20 10:13:50 2006 @@ -156,6 +156,10 @@ <version>1.2.24</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-persistence-jdbc</artifactId> + </dependency> </dependencies> <build> @@ -270,6 +274,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> + <!-- <configuration> <tasks> <taskdef name="generate" classname="org.apache.activemq.openwire.tool.JavaGeneratorTask"/> @@ -283,33 +288,38 @@ <version>${activemq-version}</version> </dependency> </dependencies> + --> + + <executions> + <execution> + <phase>process-classes</phase> + <configuration> + <tasks> + <path id="cp"> + <path refid="maven.test.classpath"/> + <path refid="maven.compile.classpath"/> + <path refid="maven.dependency.classpath"/> + </path> + <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask"> + <classpath refid="cp"/> + </taskdef> + <openjpac directory="${basedir}/target/jpa-classes"> + <classpath refid="cp"/> + <fileset dir="${basedir}/target/classes"> + <include name="org/apache/activemq/store/jpa/model/*.class"/> + </fileset> + </openjpac> + <copy todir="${basedir}/target/classes"> + <fileset dir="${basedir}/target/jpa-classes"/> + </copy> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> </plugin> - - <!-- Use Gram to Gernerate the OpenWire Marshallers --> - <!-- - <plugin> - <groupId>org.apache.activemq</groupId> - <artifactId>maven-gram-plugin</artifactId> - <version>4.1-incubator</version> - <configuration> - <scripts> - :GenerateJavaMarshalling.groovy: GenerateJavaTests.groovy: GenerateCSharpMarshalling.groovy: - GenerateCSharpClasses.groovy: GenerateCppMarshallingClasses.groovy: GenerateCppMarshallingHeaders.groovy: - GenerateCppHeaders.groovy: GenerateCppClasses.groovy: GenerateCMarshalling.groovy: - </scripts> - <groovyProperties> - <version>2</version> - </groovyProperties> - </configuration> - <dependencies> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-openwire-generator</artifactId> - <version>${activemq-version}</version> - </dependency> - </dependencies> - </plugin> - --> <plugin> <groupId>org.codehaus.mojo</groupId> Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java?view=auto&rev=477273 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java Mon Nov 20 10:13:50 2006 @@ -0,0 +1,194 @@ +package org.apache.activemq.store.jpa; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import javax.persistence.EntityManager; +import javax.persistence.Query; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.jpa.model.StoredMessage; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.wireformat.WireFormat; + +public class JPAMessageStore implements MessageStore { + + protected final JPAPersistenceAdapter adapter; + protected final WireFormat wireFormat; + protected final ActiveMQDestination destination; + protected final String destinationName; + protected AtomicLong lastMessageId = new AtomicLong(-1); + + public JPAMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { + this.adapter = adapter; + this.destination = destination; + this.destinationName = destination.getQualifiedName(); + this.wireFormat = this.adapter.getWireFormat(); + } + + public void addMessage(ConnectionContext context, Message message) throws IOException { + + EntityManager manager = adapter.beginEntityManager(context); + try { + + ByteSequence sequence = wireFormat.marshal(message); + sequence.compact(); + + StoredMessage sm = new StoredMessage(); + sm.setDestination(destinationName); + sm.setId(message.getMessageId().getBrokerSequenceId()); + sm.setMessageId(message.getMessageId().toString()); + sm.setExiration(message.getExpiration()); + sm.setData(sequence.data); + + manager.persist(sm); + + } catch (Throwable e) { + adapter.rollbackEntityManager(context,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(context,manager); + } + + public void addMessageReference(ConnectionContext context, + MessageId messageId, long expirationTime, String messageRef) + throws IOException { + throw new IOException("Not implemented."); + } + + public ActiveMQDestination getDestination() { + return destination; + } + + public Message getMessage(MessageId identity) throws IOException { + Message rc; + EntityManager manager = adapter.beginEntityManager(null); + try { + StoredMessage message=null; + if( identity.getBrokerSequenceId()!= 0 ) { + message = manager.find(StoredMessage.class, identity.getBrokerSequenceId()); + } else { + Query query = manager.createQuery("select m from StoredMessage m where m.messageId=?1"); + query.setParameter(1, identity.toString()); + message = (StoredMessage) query.getSingleResult(); + } + + rc = (Message) wireFormat.unmarshal(new ByteSequence(message.getData())); + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + return rc; + } + + public int getMessageCount() throws IOException { + Integer rc; + EntityManager manager = adapter.beginEntityManager(null); + try { + Query query = manager.createQuery("select count(m) from StoredMessage m"); + rc = (Integer) query.getSingleResult(); + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + return rc; + } + + public String getMessageReference(MessageId identity) throws IOException { + throw new IOException("Not implemented."); + } + + public void recover(MessageRecoveryListener container) throws Exception { + EntityManager manager = adapter.beginEntityManager(null); + try { + Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 order by m.id asc"); + query.setParameter(1, destinationName); + for (StoredMessage m : (List<StoredMessage>)query.getResultList()) { + Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData())); + container.recoverMessage(message); + } + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + } + + public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { + + EntityManager manager = adapter.beginEntityManager(null); + try { + + Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc"); + query.setParameter(1, destinationName); + query.setParameter(2, lastMessageId.get()); + query.setMaxResults(maxReturned); + int count = 0; + for (StoredMessage m : (List<StoredMessage>)query.getResultList()) { + Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData())); + listener.recoverMessage(message); + lastMessageId.set(m.getId()); + count++; + if( count >= maxReturned ) { + return; + } + } + + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + } + + public void removeAllMessages(ConnectionContext context) throws IOException { + EntityManager manager = adapter.beginEntityManager(context); + try { + Query query = manager.createQuery("delete from StoredMessage m where m.destination=?1"); + query.setParameter(1, destinationName); + query.executeUpdate(); + } catch (Throwable e) { + adapter.rollbackEntityManager(context,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(context,manager); + } + + public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { + EntityManager manager = adapter.beginEntityManager(context); + try { + Query query = manager.createQuery("delete from StoredMessage m where m.id=?1"); + query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId()); + query.executeUpdate(); + } catch (Throwable e) { + adapter.rollbackEntityManager(context,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(context,manager); + } + + public void resetBatching() { + lastMessageId.set(-1); + } + + public void setUsageManager(UsageManager usageManager) { + } + + public void start() throws Exception { + } + + public void stop() throws Exception { + } + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java?view=auto&rev=477273 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java Mon Nov 20 10:13:50 2006 @@ -0,0 +1,253 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.jpa; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; +import javax.persistence.Query; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.memory.MemoryTransactionStore; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * An implementation of [EMAIL PROTECTED] PersistenceAdapter} that uses JPA to + * store it's messages. + * + * @org.apache.xbean.XBean + * + * @version $Revision: 1.17 $ + */ +public class JPAPersistenceAdapter implements PersistenceAdapter { + + private static final Log log = LogFactory.getLog(JPAPersistenceAdapter.class); + String entityManagerName = "activemq"; + Properties entityManagerProperties = System.getProperties(); + EntityManagerFactory entityManagerFactory; + private WireFormat wireFormat; + private MemoryTransactionStore transactionStore; + + public void beginTransaction(ConnectionContext context) throws IOException { + if( context.getLongTermStoreContext()!=null ) + throw new IOException("Transation already started."); + + EntityManager manager = getEntityManagerFactory().createEntityManager(); + manager.getTransaction().begin(); + context.setLongTermStoreContext(manager); + } + + public void commitTransaction(ConnectionContext context) throws IOException { + EntityManager manager = (EntityManager) context.getLongTermStoreContext(); + if( manager==null ) + throw new IOException("Transation not started."); + context.setLongTermStoreContext(null); + manager.getTransaction().commit(); + manager.close(); + } + + public void rollbackTransaction(ConnectionContext context) throws IOException { + EntityManager manager = (EntityManager) context.getLongTermStoreContext(); + if( manager==null ) + throw new IOException("Transation not started."); + context.setLongTermStoreContext(null); + manager.getTransaction().rollback(); + manager.close(); + } + + public EntityManager beginEntityManager(ConnectionContext context) { + if( context==null || context.getLongTermStoreContext()==null ) { + EntityManager manager = getEntityManagerFactory().createEntityManager(); + manager.getTransaction().begin(); + return manager; + } else { + return (EntityManager) context.getLongTermStoreContext(); + } + } + + public void commitEntityManager(ConnectionContext context, EntityManager manager) { + if( context==null || context.getLongTermStoreContext()==null ) { + manager.getTransaction().commit(); + manager.close(); + } + } + + public void rollbackEntityManager(ConnectionContext context, EntityManager manager) { + if( context==null || context.getLongTermStoreContext()==null ) { + manager.getTransaction().rollback(); + manager.close(); + } + } + + public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { + MessageStore rc = new JPAMessageStore(this, destination); + if (transactionStore != null) { + rc = transactionStore.proxy(rc); + } + return rc; + } + + public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { + TopicMessageStore rc = new JPATopicMessageStore(this, destination); + if (transactionStore != null) { + rc = transactionStore.proxy(rc); + } + return rc; + } + + public TransactionStore createTransactionStore() throws IOException { + if (transactionStore == null) { + transactionStore = new MemoryTransactionStore(); + } + return this.transactionStore; + } + + public void deleteAllMessages() throws IOException { + EntityManager manager = beginEntityManager(null); + try { + Query query = manager.createQuery("delete from StoredMessage m"); + query.executeUpdate(); + query = manager.createQuery("delete from StoredSubscription ss"); + query.executeUpdate(); + } catch (Throwable e) { + rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + commitEntityManager(null,manager); + } + + public Set getDestinations() { + HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); + + EntityManager manager = beginEntityManager(null); + try { + Query query = manager.createQuery("select distinct m.destination from StoredMessage m"); + for (String dest : (List<String>)query.getResultList()) { + rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE)); + } + } catch (RuntimeException e) { + rollbackEntityManager(null,manager); + throw e; + } + commitEntityManager(null,manager); + return rc; + } + + public long getLastMessageBrokerSequenceId() throws IOException { + long rc=0; + EntityManager manager = beginEntityManager(null); + try { + Query query = manager.createQuery("select max(m.id) from StoredMessage m"); + Long t = (Long) query.getSingleResult(); + if( t != null ) { + rc = t; + } + } catch (Throwable e) { + rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + commitEntityManager(null,manager); + return rc; + } + + public boolean isUseExternalMessageReferences() { + return false; + } + + public void setUsageManager(UsageManager usageManager) { + } + + public void setUseExternalMessageReferences(boolean enable) { + if( enable ) { + throw new IllegalArgumentException("This persistence adapter does not support externa message references"); + } + } + + public void start() throws Exception { + } + + public void stop() throws Exception { + if( entityManagerFactory !=null ) { + entityManagerFactory.close(); + } + } + + public EntityManagerFactory getEntityManagerFactory() { + if( entityManagerFactory == null ) { + entityManagerFactory = createEntityManagerFactory(); + } + return entityManagerFactory; + } + protected EntityManagerFactory createEntityManagerFactory() { + return Persistence.createEntityManagerFactory(getEntityManagerName(), getEntityManagerProperties()); + } + + public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) { + this.entityManagerFactory = entityManagerFactory; + } + + public Properties getEntityManagerProperties() { + return entityManagerProperties; + } + public void setEntityManagerProperties( + Properties entityManagerProperties) { + this.entityManagerProperties = entityManagerProperties; + } + + public String getEntityManagerName() { + return entityManagerName; + } + public void setEntityManagerName(String entityManager) { + this.entityManagerName = entityManager; + } + + public WireFormat getWireFormat() { + if(wireFormat==null) { + wireFormat = createWireFormat(); + } + return wireFormat; + } + + private WireFormat createWireFormat() { + OpenWireFormatFactory wff = new OpenWireFormatFactory(); + return wff.createWireFormat(); + } + + public void setWireFormat(WireFormat wireFormat) { + this.wireFormat = wireFormat; + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java ------------------------------------------------------------------------------ svn:executable = * Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java?view=auto&rev=477273 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java Mon Nov 20 10:13:50 2006 @@ -0,0 +1,233 @@ +package org.apache.activemq.store.jpa; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import javax.persistence.EntityManager; +import javax.persistence.Query; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.jpa.model.StoredMessage; +import org.apache.activemq.store.jpa.model.StoredSubscription; +import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; + +public class JPATopicMessageStore extends JPAMessageStore implements TopicMessageStore { + private Map<SubscriptionId,AtomicLong> subscriberLastMessageMap=new ConcurrentHashMap<SubscriptionId,AtomicLong>(); + + public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { + super(adapter, destination); + } + + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { + EntityManager manager = adapter.beginEntityManager(context); + try { + StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); + ss.setLastAckedId(messageId.getBrokerSequenceId()); + } catch (Throwable e) { + adapter.rollbackEntityManager(context,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(context,manager); + } + + public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { + EntityManager manager = adapter.beginEntityManager(null); + try { + StoredSubscription ss = new StoredSubscription(); + ss.setClientId(clientId); + ss.setSubscriptionName(subscriptionName); + ss.setDestination(destinationName); + ss.setSelector(selector); + ss.setLastAckedId(-1); + + if( !retroactive ) { + Query query = manager.createQuery("select max(m.id) from StoredMessage m"); + Long rc = (Long) query.getSingleResult(); + if( rc != null ) { + ss.setLastAckedId(rc); + } + } + + manager.persist(ss); + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + } + + public void deleteSubscription(String clientId, String subscriptionName) throws IOException { + EntityManager manager = adapter.beginEntityManager(null); + try { + StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); + manager.remove(ss); + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + } + + private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) { + Query query = manager.createQuery( + "select ss from StoredSubscription ss " + + "where ss.clientId=?1 " + + "and ss.subscriptionName=?2 " + + "and ss.destination=?3"); + query.setParameter(1, clientId); + query.setParameter(2, subscriptionName); + query.setParameter(3, destinationName); + List<StoredSubscription> resultList = query.getResultList(); + if( resultList.isEmpty() ) + return null; + return resultList.get(0); + } + + public SubscriptionInfo[] getAllSubscriptions() throws IOException { + SubscriptionInfo rc[]; + EntityManager manager = adapter.beginEntityManager(null); + try { + ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>(); + + Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1"); + query.setParameter(1, destinationName); + for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) { + SubscriptionInfo info = new SubscriptionInfo(); + info.setClientId(ss.getClientId()); + info.setDestination(destination); + info.setSelector(ss.getSelector()); + info.setSubcriptionName(ss.getSubscriptionName()); + l.add(info); + } + + rc = new SubscriptionInfo[l.size()]; + l.toArray(rc); + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + return rc; + } + + public int getMessageCount(String clientId, String subscriptionName) throws IOException { + Integer rc; + EntityManager manager = adapter.beginEntityManager(null); + try { + Query query = manager.createQuery( + "select count(m) FROM StoredMessage m, StoredSubscription ss " + + "where ss.clientId=?1 " + + "and ss.subscriptionName=?2 " + + "and ss.destination=?3 " + + "and m.desination=ss.destination and m.id > ss.lastAckedId"); + query.setParameter(1, clientId); + query.setParameter(2, subscriptionName); + query.setParameter(2, destinationName); + rc = (Integer) query.getSingleResult(); + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + return rc; + } + + public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { + SubscriptionInfo rc=null; + EntityManager manager = adapter.beginEntityManager(null); + try { + StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); + if( ss != null ) { + rc = new SubscriptionInfo(); + rc.setClientId(ss.getClientId()); + rc.setDestination(destination); + rc.setSelector(ss.getSelector()); + rc.setSubcriptionName(ss.getSubscriptionName()); + } + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + return rc; + } + + public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { + EntityManager manager = adapter.beginEntityManager(null); + try { + SubscriptionId id = new SubscriptionId(); + id.setClientId(clientId); + id.setSubscriptionName(subscriptionName); + id.setDestination(destinationName); + + AtomicLong last=subscriberLastMessageMap.get(id); + if(last==null){ + StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); + last=new AtomicLong(ss.getLastAckedId()); + subscriberLastMessageMap.put(id,last); + } + final AtomicLong lastMessageId=last; + + Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc"); + query.setParameter(1, destinationName); + query.setParameter(2, lastMessageId.get()); + query.setMaxResults(maxReturned); + int count = 0; + for (StoredMessage m : (List<StoredMessage>)query.getResultList()) { + Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData())); + listener.recoverMessage(message); + lastMessageId.set(m.getId()); + count++; + if( count >= maxReturned ) { + return; + } + } + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + } + + public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { + EntityManager manager = adapter.beginEntityManager(null); + try { + + StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); + + Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc"); + query.setParameter(1, destinationName); + query.setParameter(2, ss.getLastAckedId()); + for (StoredMessage m : (List<StoredMessage>)query.getResultList()) { + Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData())); + listener.recoverMessage(message); + } + } catch (Throwable e) { + adapter.rollbackEntityManager(null,manager); + throw IOExceptionSupport.create(e); + } + adapter.commitEntityManager(null,manager); + } + + public void resetBatching(String clientId, String subscriptionName) { + SubscriptionId id = new SubscriptionId(); + id.setClientId(clientId); + id.setSubscriptionName(subscriptionName); + id.setDestination(destinationName); + + subscriberLastMessageMap.remove(id); + } + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java?view=auto&rev=477273 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java Mon Nov 20 10:13:50 2006 @@ -0,0 +1,85 @@ +/* + * Copyright 2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.jpa.model; + +import javax.persistence.Basic; +import javax.persistence.Entity; +import javax.persistence.Id; + +/** + */ [EMAIL PROTECTED] +public class StoredMessage { + + @Id + private long id; + + @Basic + private String messageId; + + @Basic + private String destination; + + @Basic + private long exiration; + + @Basic + private byte[] data; + + public StoredMessage() { + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + public String getDestination() { + return destination; + } + + public void setDestination(String destination) { + this.destination = destination; + } + + public long getExiration() { + return exiration; + } + + public void setExiration(long exiration) { + this.exiration = exiration; + } + + public String getMessageId() { + return messageId; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public long getId() { + return id; + } + + public void setId(long sequenceId) { + this.id = sequenceId; + } + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java?view=auto&rev=477273 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java Mon Nov 20 10:13:50 2006 @@ -0,0 +1,153 @@ +/* + * Copyright 2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.jpa.model; + +import javax.persistence.Basic; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; + +/** + */ [EMAIL PROTECTED] +public class StoredSubscription { + + /** + * Application identity class for Magazine. + */ + public static class SubscriptionId { + + public String destination; + public String clientId; + public String subscriptionName; + + public boolean equals(Object other) { + if (other == this) + return true; + if (!(other instanceof SubscriptionId)) + return false; + + SubscriptionId sid = (SubscriptionId) other; + return (destination == sid.destination || (destination != null && destination.equals(sid.destination))) + && (clientId == sid.clientId || (clientId != null && clientId.equals(sid.clientId))) + && (subscriptionName == sid.subscriptionName || (subscriptionName != null && subscriptionName.equals(sid.subscriptionName))); + } + + /** + * Hashcode must also depend on identity values. + */ + public int hashCode() { + return ((destination == null) ? 0 : destination.hashCode()) + ^ ((clientId == null) ? 0 : clientId.hashCode()) + ^ ((subscriptionName == null) ? 0 : subscriptionName.hashCode()) + ; + } + + public String toString() { + return destination + ":" + clientId + ":" + subscriptionName; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getDestination() { + return destination; + } + + public void setDestination(String destination) { + this.destination = destination; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + } + + @Id + @GeneratedValue(strategy=GenerationType.AUTO) + private long id; + + @Basic + private String destination; + @Basic + private String clientId; + @Basic + private String subscriptionName; + + @Basic + private long lastAckedId; + @Basic + private String selector; + + + public long getLastAckedId() { + return lastAckedId; + } + + public void setLastAckedId(long lastAckedId) { + this.lastAckedId = lastAckedId; + } + + public String getSelector() { + return selector; + } + + public void setSelector(String selector) { + this.selector = selector; + } + + public String getDestination() { + return destination; + } + + public void setDestination(String destination) { + this.destination = destination; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } +} Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java?view=diff&rev=477273&r1=477272&r2=477273 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java Mon Nov 20 10:13:50 2006 @@ -54,5 +54,14 @@ public void setOffset(int offset) { this.offset = offset; } + + public void compact() { + if( length != data.length ) { + byte t[] = new byte[length]; + System.arraycopy(data, offset, t, 0, length); + data=t; + offset=0; + } + } } Added: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml?view=auto&rev=477273 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml (added) +++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml Mon Nov 20 10:13:50 2006 @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Copyright 2006 The Apache Software Foundation. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<persistence xmlns="http://java.sun.com/xml/ns/persistence" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + version="1.0"> + <persistence-unit name="activemq" transaction-type="RESOURCE_LOCAL"> + <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider> + <class>org.apache.activemq.store.jpa.model.StoredMessage</class> + <class>org.apache.activemq.store.jpa.model.StoredSubscription</class> + <!-- + <class>org.apache.activemq.store.jpa.model.StoredSubscription$SubscriptionId</class> + --> + </persistence-unit> +</persistence> Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java?view=auto&rev=477273 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java Mon Nov 20 10:13:50 2006 @@ -0,0 +1,71 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store; + +import java.util.Properties; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.RecoveryBrokerTest; +import org.apache.activemq.store.jpa.JPAPersistenceAdapter; + +/** + * Used to verify that recovery works correctly against + * + * @version $Revision$ + */ +public class JPARecoveryBrokerTest extends RecoveryBrokerTest { + + protected BrokerService createBroker() throws Exception { + BrokerService service = new BrokerService(); + service.setDeleteAllMessagesOnStartup(true); + JPAPersistenceAdapter pa = new JPAPersistenceAdapter(); + Properties props = new Properties(); + props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); + props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true"); + props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema"); + props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); + pa.setEntityManagerProperties(props); + service.setPersistenceAdapter(pa); + return service; + + } + + protected BrokerService createRestartedBroker() throws Exception { + BrokerService service = new BrokerService(); + JPAPersistenceAdapter pa = new JPAPersistenceAdapter(); + Properties props = new Properties(); + props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); + props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true"); + props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema"); + props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); + pa.setEntityManagerProperties(props); + service.setPersistenceAdapter(pa); + return service; + } + + public static Test suite() { + return suite(JPARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} Modified: incubator/activemq/trunk/pom.xml URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/pom.xml?view=diff&rev=477273&r1=477272&r2=477273 ============================================================================== --- incubator/activemq/trunk/pom.xml (original) +++ incubator/activemq/trunk/pom.xml Mon Nov 20 10:13:50 2006 @@ -314,6 +314,12 @@ <version>${commons-collections-version}</version> </dependency> + <dependency> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-persistence-jdbc</artifactId> + <version>${openjpa-version}</version> + </dependency> + <!-- Optional Spring Support --> <dependency> <groupId>org.springframework</groupId> @@ -868,8 +874,9 @@ <axis-version>1.2-RC1</axis-version> <cglib-version>2.0</cglib-version> <commons-beanutils-version>1.6.1</commons-beanutils-version> - <commons-collections-version>2.1</commons-collections-version> - <commons-dbcp-version>1.2</commons-dbcp-version> + <commons-collections-version>3.1</commons-collections-version> + <openjpa-version>0.9.6-incubating</openjpa-version> + <commons-dbcp-version>1.2.1</commons-dbcp-version> <commons-httpclient-version>2.0.1</commons-httpclient-version> <commons-logging-version>1.1</commons-logging-version> <commons-pool-version>1.2</commons-pool-version> @@ -887,7 +894,7 @@ <junit-version>3.8.1</junit-version> <jxta-version>2.0</jxta-version> <log4j-version>1.2.12</log4j-version> - <org-apache-derby-version>10.1.1.0</org-apache-derby-version> + <org-apache-derby-version>10.1.3.1</org-apache-derby-version> <org-apache-geronimo-specs-version>1.0</org-apache-geronimo-specs-version> <org-apache-maven-surefire-plugin-version>2.2</org-apache-maven-surefire-plugin-version> <p2psockets-version>1.1.2</p2psockets-version>