Author: chirino
Date: Mon Nov 20 10:13:50 2006
New Revision: 477273

URL: http://svn.apache.org/viewvc?view=rev&rev=477273
Log:
Added the initial cut of a JPA based message store.


Added:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
   (with props)
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
Modified:
    incubator/activemq/trunk/activemq-core/pom.xml
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java
    incubator/activemq/trunk/pom.xml

Modified: incubator/activemq/trunk/activemq-core/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?view=diff&rev=477273&r1=477272&r2=477273
==============================================================================
--- incubator/activemq/trunk/activemq-core/pom.xml (original)
+++ incubator/activemq/trunk/activemq-core/pom.xml Mon Nov 20 10:13:50 2006
@@ -156,6 +156,10 @@
       <version>1.2.24</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.openjpa</groupId>
+      <artifactId>openjpa-persistence-jdbc</artifactId>
+    </dependency>    
   </dependencies>
 
   <build>
@@ -270,6 +274,7 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>
+        <!-- 
         <configuration>
           <tasks>
             <taskdef name="generate" 
classname="org.apache.activemq.openwire.tool.JavaGeneratorTask"/>
@@ -283,33 +288,38 @@
             <version>${activemq-version}</version>
           </dependency>
         </dependencies>
+         -->
+        
+        <executions>
+          <execution>
+            <phase>process-classes</phase>
+            <configuration>
+              <tasks>
+                <path id="cp">
+                  <path refid="maven.test.classpath"/>
+                  <path refid="maven.compile.classpath"/>
+                  <path refid="maven.dependency.classpath"/>
+                </path>
+                <taskdef name="openjpac" 
classname="org.apache.openjpa.ant.PCEnhancerTask">
+                  <classpath refid="cp"/>
+                </taskdef>
+                <openjpac directory="${basedir}/target/jpa-classes">
+                  <classpath refid="cp"/>
+                  <fileset dir="${basedir}/target/classes">
+                    <include 
name="org/apache/activemq/store/jpa/model/*.class"/>
+                  </fileset>
+                </openjpac>
+                <copy todir="${basedir}/target/classes">
+                  <fileset dir="${basedir}/target/jpa-classes"/>
+                </copy>
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
       </plugin>
-
-      <!-- Use Gram to Gernerate the OpenWire Marshallers -->
-      <!--
-      <plugin>
-        <groupId>org.apache.activemq</groupId>
-        <artifactId>maven-gram-plugin</artifactId>
-        <version>4.1-incubator</version>
-        <configuration>
-          <scripts>
-            :GenerateJavaMarshalling.groovy: GenerateJavaTests.groovy: 
GenerateCSharpMarshalling.groovy:
-            GenerateCSharpClasses.groovy: 
GenerateCppMarshallingClasses.groovy: GenerateCppMarshallingHeaders.groovy:
-            GenerateCppHeaders.groovy: GenerateCppClasses.groovy: 
GenerateCMarshalling.groovy:
-          </scripts>
-          <groovyProperties>
-            <version>2</version>
-          </groovyProperties>
-        </configuration>
-        <dependencies>
-          <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-openwire-generator</artifactId>
-            <version>${activemq-version}</version>
-          </dependency>
-        </dependencies>
-      </plugin>
-      -->
 
       <plugin>
         <groupId>org.codehaus.mojo</groupId>

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java?view=auto&rev=477273
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java
 Mon Nov 20 10:13:50 2006
@@ -0,0 +1,194 @@
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.jpa.model.StoredMessage;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+public class JPAMessageStore implements MessageStore {
+
+       protected final JPAPersistenceAdapter adapter;
+       protected final WireFormat wireFormat;
+       protected final ActiveMQDestination destination;
+       protected final String destinationName;
+    protected AtomicLong lastMessageId = new AtomicLong(-1);
+
+       public JPAMessageStore(JPAPersistenceAdapter adapter, 
ActiveMQDestination destination) {
+               this.adapter = adapter;
+               this.destination = destination;
+               this.destinationName = destination.getQualifiedName();
+               this.wireFormat = this.adapter.getWireFormat();
+       }
+
+       public void addMessage(ConnectionContext context, Message message) 
throws IOException {
+               
+               EntityManager manager = adapter.beginEntityManager(context);
+               try {
+                       
+                       ByteSequence sequence = wireFormat.marshal(message);
+                       sequence.compact();
+                       
+                       StoredMessage sm = new StoredMessage();
+                       sm.setDestination(destinationName);
+                       sm.setId(message.getMessageId().getBrokerSequenceId());
+                       sm.setMessageId(message.getMessageId().toString());
+                       sm.setExiration(message.getExpiration());
+                       sm.setData(sequence.data);
+               
+                       manager.persist(sm);
+                       
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(context,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(context,manager);           
+       }
+
+       public void addMessageReference(ConnectionContext context,
+                       MessageId messageId, long expirationTime, String 
messageRef)
+                       throws IOException {
+               throw new IOException("Not implemented.");
+       }
+
+       public ActiveMQDestination getDestination() {
+               return destination;
+       }
+
+       public Message getMessage(MessageId identity) throws IOException {
+               Message rc;
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {
+                       StoredMessage message=null;
+                       if( identity.getBrokerSequenceId()!= 0 ) {
+                               message = manager.find(StoredMessage.class, 
identity.getBrokerSequenceId());                    
+                       } else {
+                               Query query = manager.createQuery("select m 
from StoredMessage m where m.messageId=?1");
+                               query.setParameter(1, identity.toString());
+                               message = (StoredMessage) 
query.getSingleResult();
+                       }
+                       
+                       rc = (Message) wireFormat.unmarshal(new 
ByteSequence(message.getData()));
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+               return rc;
+       }
+
+       public int getMessageCount() throws IOException {
+               Integer rc;
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {
+                       Query query = manager.createQuery("select count(m) from 
StoredMessage m");
+                       rc = (Integer) query.getSingleResult();
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+               return rc;
+       }
+
+       public String getMessageReference(MessageId identity) throws 
IOException {
+               throw new IOException("Not implemented.");
+       }
+
+       public void recover(MessageRecoveryListener container) throws Exception 
{
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {
+                       Query query = manager.createQuery("select m from 
StoredMessage m where m.destination=?1 order by m.id asc");
+                       query.setParameter(1, destinationName);
+                       for (StoredMessage m : 
(List<StoredMessage>)query.getResultList()) {
+                               Message message = (Message) 
wireFormat.unmarshal(new ByteSequence(m.getData()));
+                               container.recoverMessage(message);
+               }
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+       }
+
+       public void recoverNextMessages(int maxReturned, 
MessageRecoveryListener listener) throws Exception {
+               
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {
+                       
+                       Query query = manager.createQuery("select m from 
StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+                       query.setParameter(1, destinationName);
+                       query.setParameter(2, lastMessageId.get());
+                       query.setMaxResults(maxReturned);
+                       int count = 0;
+                       for (StoredMessage m : 
(List<StoredMessage>)query.getResultList()) {
+                               Message message = (Message) 
wireFormat.unmarshal(new ByteSequence(m.getData()));
+                               listener.recoverMessage(message);
+                               lastMessageId.set(m.getId());
+                               count++;
+                               if( count >= maxReturned ) { 
+                                       return;
+                               }
+               }
+
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+       }
+
+       public void removeAllMessages(ConnectionContext context) throws 
IOException {
+               EntityManager manager = adapter.beginEntityManager(context);
+               try {
+                       Query query = manager.createQuery("delete from 
StoredMessage m where m.destination=?1");
+                       query.setParameter(1, destinationName);
+                       query.executeUpdate();
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(context,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(context,manager);
+       }
+
+       public void removeMessage(ConnectionContext context, MessageAck ack) 
throws IOException {
+               EntityManager manager = adapter.beginEntityManager(context);
+               try {
+                       Query query = manager.createQuery("delete from 
StoredMessage m where m.id=?1");
+                       query.setParameter(1, 
ack.getLastMessageId().getBrokerSequenceId());
+                       query.executeUpdate();
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(context,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(context,manager);
+       }
+
+       public void resetBatching() {
+        lastMessageId.set(-1);
+       }
+
+       public void setUsageManager(UsageManager usageManager) {
+       }
+
+       public void start() throws Exception {
+       }
+
+       public void stop() throws Exception {
+       }
+
+}

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java?view=auto&rev=477273
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
 Mon Nov 20 10:13:50 2006
@@ -0,0 +1,253 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.memory.MemoryTransactionStore;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of [EMAIL PROTECTED] PersistenceAdapter} that uses JPA to
+ * store it's messages.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.17 $
+ */
+public class JPAPersistenceAdapter implements PersistenceAdapter {
+
+    private static final Log log = 
LogFactory.getLog(JPAPersistenceAdapter.class);
+    String entityManagerName = "activemq";
+       Properties entityManagerProperties = System.getProperties();
+       EntityManagerFactory entityManagerFactory;
+       private WireFormat wireFormat;
+       private MemoryTransactionStore transactionStore;
+       
+       public void beginTransaction(ConnectionContext context) throws 
IOException {
+               if( context.getLongTermStoreContext()!=null )
+                       throw new IOException("Transation already started.");
+               
+               EntityManager manager = 
getEntityManagerFactory().createEntityManager();                
+               manager.getTransaction().begin();
+               context.setLongTermStoreContext(manager);
+       }
+
+       public void commitTransaction(ConnectionContext context) throws 
IOException {
+               EntityManager manager = (EntityManager) 
context.getLongTermStoreContext();
+               if( manager==null )
+                       throw new IOException("Transation not started.");
+               context.setLongTermStoreContext(null);
+               manager.getTransaction().commit();
+               manager.close();
+       }
+       
+       public void rollbackTransaction(ConnectionContext context) throws 
IOException {
+               EntityManager manager = (EntityManager) 
context.getLongTermStoreContext();
+               if( manager==null )
+                       throw new IOException("Transation not started.");
+               context.setLongTermStoreContext(null);
+               manager.getTransaction().rollback();
+               manager.close();
+       }
+       
+       public EntityManager beginEntityManager(ConnectionContext context) {
+               if( context==null || context.getLongTermStoreContext()==null ) {
+                       EntityManager manager = 
getEntityManagerFactory().createEntityManager();                
+                       manager.getTransaction().begin();
+                       return manager;
+               } else {
+                       return (EntityManager) 
context.getLongTermStoreContext();
+               }
+       }
+       
+       public void commitEntityManager(ConnectionContext context, 
EntityManager manager) {             
+               if( context==null || context.getLongTermStoreContext()==null ) {
+                       manager.getTransaction().commit();
+                       manager.close();
+               } 
+       }
+       
+       public void rollbackEntityManager(ConnectionContext context, 
EntityManager manager) {           
+               if( context==null || context.getLongTermStoreContext()==null ) {
+                       manager.getTransaction().rollback();
+                       manager.close();
+               } 
+       }
+
+       public MessageStore createQueueMessageStore(ActiveMQQueue destination) 
throws IOException {
+               MessageStore rc =  new JPAMessageStore(this, destination);
+        if (transactionStore != null) {
+            rc = transactionStore.proxy(rc);
+        }
+        return rc;
+       }
+
+       public TopicMessageStore createTopicMessageStore(ActiveMQTopic 
destination) throws IOException {
+               TopicMessageStore rc = new JPATopicMessageStore(this, 
destination);
+        if (transactionStore != null) {
+            rc = transactionStore.proxy(rc);
+        }
+        return rc;
+       }
+
+       public TransactionStore createTransactionStore() throws IOException {
+        if (transactionStore == null) {
+            transactionStore = new MemoryTransactionStore();
+        }
+        return this.transactionStore;
+       }
+
+       public void deleteAllMessages() throws IOException {
+               EntityManager manager = beginEntityManager(null);
+               try {
+                       Query query = manager.createQuery("delete from 
StoredMessage m");
+                       query.executeUpdate();
+                       query = manager.createQuery("delete from 
StoredSubscription ss");
+                       query.executeUpdate();
+               } catch (Throwable e) {
+                       rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               commitEntityManager(null,manager);              
+       }
+
+       public Set getDestinations() {
+               HashSet<ActiveMQDestination> rc = new 
HashSet<ActiveMQDestination>();
+               
+               EntityManager manager = beginEntityManager(null);
+               try {
+                       Query query = manager.createQuery("select distinct 
m.destination from StoredMessage m");
+                       for (String dest : (List<String>)query.getResultList()) 
{
+                               
rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE));
+               }
+               } catch (RuntimeException e) {
+                       rollbackEntityManager(null,manager);
+                       throw e;
+               }
+               commitEntityManager(null,manager);              
+               return rc;
+       }
+
+       public long getLastMessageBrokerSequenceId() throws IOException {
+               long rc=0;
+               EntityManager manager = beginEntityManager(null);
+               try {
+                       Query query = manager.createQuery("select max(m.id) 
from StoredMessage m");
+                       Long t = (Long) query.getSingleResult();
+                       if( t != null ) {
+                               rc = t;
+                       }
+               } catch (Throwable e) {
+                       rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               commitEntityManager(null,manager);              
+               return rc;
+       }
+
+       public boolean isUseExternalMessageReferences() {
+               return false;
+       }
+
+       public void setUsageManager(UsageManager usageManager) {
+       }
+
+       public void setUseExternalMessageReferences(boolean enable) {
+               if( enable ) {
+                       throw new IllegalArgumentException("This persistence 
adapter does not support externa message references");
+               }
+       }
+
+       public void start() throws Exception {
+       }
+
+       public void stop() throws Exception {
+               if( entityManagerFactory !=null ) {
+                       entityManagerFactory.close();
+               }
+       }
+
+       public EntityManagerFactory getEntityManagerFactory() {
+               if( entityManagerFactory == null ) {
+                       entityManagerFactory = createEntityManagerFactory();
+               }
+               return entityManagerFactory;
+       }
+       protected EntityManagerFactory createEntityManagerFactory() {
+               return 
Persistence.createEntityManagerFactory(getEntityManagerName(), 
getEntityManagerProperties());
+       }
+
+       public void setEntityManagerFactory(EntityManagerFactory 
entityManagerFactory) {
+               this.entityManagerFactory = entityManagerFactory;
+       }
+
+       public Properties getEntityManagerProperties() {
+               return entityManagerProperties;
+       }
+       public void setEntityManagerProperties(
+                       Properties entityManagerProperties) {
+               this.entityManagerProperties = entityManagerProperties;
+       }
+
+       public String getEntityManagerName() {
+               return entityManagerName;
+       }
+       public void setEntityManagerName(String entityManager) {
+               this.entityManagerName = entityManager;
+       }
+
+       public WireFormat getWireFormat() {
+               if(wireFormat==null) {
+                       wireFormat = createWireFormat();
+               }
+               return wireFormat;
+       }
+
+       private WireFormat createWireFormat() {
+               OpenWireFormatFactory wff = new OpenWireFormatFactory();
+               return wff.createWireFormat(); 
+       }
+
+       public void setWireFormat(WireFormat wireFormat) {
+               this.wireFormat = wireFormat;
+       }
+
+}

Propchange: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java?view=auto&rev=477273
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java
 Mon Nov 20 10:13:50 2006
@@ -0,0 +1,233 @@
+package org.apache.activemq.store.jpa;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.jpa.model.StoredMessage;
+import org.apache.activemq.store.jpa.model.StoredSubscription;
+import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+
+public class JPATopicMessageStore extends JPAMessageStore implements 
TopicMessageStore {
+    private Map<SubscriptionId,AtomicLong> subscriberLastMessageMap=new 
ConcurrentHashMap<SubscriptionId,AtomicLong>();
+
+       public JPATopicMessageStore(JPAPersistenceAdapter adapter, 
ActiveMQDestination destination) {
+               super(adapter, destination);
+       }
+
+       public void acknowledge(ConnectionContext context, String clientId, 
String subscriptionName, MessageId messageId) throws IOException {
+               EntityManager manager = adapter.beginEntityManager(context);
+               try {
+                       StoredSubscription ss = findStoredSubscription(manager, 
clientId, subscriptionName);
+                       ss.setLastAckedId(messageId.getBrokerSequenceId());
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(context,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(context,manager);
+       }
+
+       public void addSubsciption(String clientId, String subscriptionName, 
String selector, boolean retroactive) throws IOException {
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {                   
+                       StoredSubscription ss = new StoredSubscription();
+                       ss.setClientId(clientId);
+                       ss.setSubscriptionName(subscriptionName);
+                       ss.setDestination(destinationName);
+                       ss.setSelector(selector);
+                       ss.setLastAckedId(-1);
+                       
+                       if( !retroactive ) {
+                               Query query = manager.createQuery("select 
max(m.id) from StoredMessage m");
+                               Long rc = (Long) query.getSingleResult();
+                               if( rc != null ) {
+                                       ss.setLastAckedId(rc);
+                               }
+                       }
+                       
+                       manager.persist(ss);
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+       }
+
+       public void deleteSubscription(String clientId, String 
subscriptionName) throws IOException {
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {                   
+                       StoredSubscription ss = findStoredSubscription(manager, 
clientId, subscriptionName);
+                       manager.remove(ss);
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+       }
+
+       private StoredSubscription findStoredSubscription(EntityManager 
manager, String clientId, String subscriptionName) {
+               Query query = manager.createQuery(
+                               "select ss from StoredSubscription ss " +
+                               "where ss.clientId=?1 " +
+                               "and ss.subscriptionName=?2 " +
+                               "and ss.destination=?3");
+               query.setParameter(1, clientId);
+               query.setParameter(2, subscriptionName);
+               query.setParameter(3, destinationName);
+               List<StoredSubscription> resultList = query.getResultList();
+               if( resultList.isEmpty() )
+                       return null;
+               return resultList.get(0); 
+       }
+
+       public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+               SubscriptionInfo rc[];
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {
+                       ArrayList<SubscriptionInfo> l = new 
ArrayList<SubscriptionInfo>();
+                       
+                       Query query = manager.createQuery("select ss from 
StoredSubscription ss where ss.destination=?1");
+                       query.setParameter(1, destinationName);
+                       for (StoredSubscription ss : 
(List<StoredSubscription>)query.getResultList()) {
+                               SubscriptionInfo info = new SubscriptionInfo();
+                               info.setClientId(ss.getClientId());
+                               info.setDestination(destination);
+                               info.setSelector(ss.getSelector());
+                               
info.setSubcriptionName(ss.getSubscriptionName());
+                               l.add(info);
+               }
+                       
+                       rc = new SubscriptionInfo[l.size()];
+                       l.toArray(rc);
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);              
+               return rc;
+       }
+
+       public int getMessageCount(String clientId, String subscriptionName) 
throws IOException {
+               Integer rc;
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {   
+                       Query query = manager.createQuery(
+                                       "select count(m) FROM StoredMessage m, 
StoredSubscription ss " +
+                                       "where ss.clientId=?1 " +
+                                       "and   ss.subscriptionName=?2 " +
+                                       "and   ss.destination=?3 " +
+                                       "and   m.desination=ss.destination and 
m.id > ss.lastAckedId");
+                       query.setParameter(1, clientId);
+                       query.setParameter(2, subscriptionName);
+                       query.setParameter(2, destinationName);
+               rc = (Integer) query.getSingleResult();         
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+               return rc;
+       }
+
+       public SubscriptionInfo lookupSubscription(String clientId, String 
subscriptionName) throws IOException {
+               SubscriptionInfo rc=null;
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {                   
+                       StoredSubscription ss = findStoredSubscription(manager, 
clientId, subscriptionName);
+                       if( ss != null ) {
+                               rc = new SubscriptionInfo();
+                               rc.setClientId(ss.getClientId());
+                               rc.setDestination(destination);
+                               rc.setSelector(ss.getSelector());
+                               rc.setSubcriptionName(ss.getSubscriptionName());
+                       }
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+               return rc;
+       }
+
+       public void recoverNextMessages(String clientId, String 
subscriptionName, int maxReturned, MessageRecoveryListener listener) throws 
Exception {
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {
+                       SubscriptionId id = new SubscriptionId();
+                       id.setClientId(clientId);
+                       id.setSubscriptionName(subscriptionName);
+                       id.setDestination(destinationName);
+       
+                       AtomicLong last=subscriberLastMessageMap.get(id);
+               if(last==null){
+                       StoredSubscription ss = findStoredSubscription(manager, 
clientId, subscriptionName);
+                   last=new AtomicLong(ss.getLastAckedId());
+                   subscriberLastMessageMap.put(id,last);
+               }
+               final AtomicLong lastMessageId=last;
+                       
+                       Query query = manager.createQuery("select m from 
StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+                       query.setParameter(1, destinationName);
+                       query.setParameter(2, lastMessageId.get());
+                       query.setMaxResults(maxReturned);
+                       int count = 0;
+                       for (StoredMessage m : 
(List<StoredMessage>)query.getResultList()) {
+                               Message message = (Message) 
wireFormat.unmarshal(new ByteSequence(m.getData()));
+                               listener.recoverMessage(message);
+                               lastMessageId.set(m.getId());
+                               count++;
+                               if( count >= maxReturned ) { 
+                                       return;
+                               }
+               }        
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+       }
+
+       public void recoverSubscription(String clientId, String 
subscriptionName, MessageRecoveryListener listener) throws Exception {
+               EntityManager manager = adapter.beginEntityManager(null);
+               try {
+       
+                       StoredSubscription ss = findStoredSubscription(manager, 
clientId, subscriptionName);
+                       
+                       Query query = manager.createQuery("select m from 
StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
+                       query.setParameter(1, destinationName);
+                       query.setParameter(2, ss.getLastAckedId());
+                       for (StoredMessage m : 
(List<StoredMessage>)query.getResultList()) {
+                               Message message = (Message) 
wireFormat.unmarshal(new ByteSequence(m.getData()));
+                               listener.recoverMessage(message);
+               }
+               } catch (Throwable e) {
+                       adapter.rollbackEntityManager(null,manager);
+                       throw IOExceptionSupport.create(e);
+               }
+               adapter.commitEntityManager(null,manager);
+       }
+
+       public void resetBatching(String clientId, String subscriptionName) {
+               SubscriptionId id = new SubscriptionId();
+               id.setClientId(clientId);
+               id.setSubscriptionName(subscriptionName);
+               id.setDestination(destinationName);
+
+        subscriberLastMessageMap.remove(id);
+       }
+
+}

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java?view=auto&rev=477273
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java
 Mon Nov 20 10:13:50 2006
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa.model;
+
+import javax.persistence.Basic;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+/** 
+ */
[EMAIL PROTECTED]
+public class StoredMessage {
+       
+    @Id
+    private long id;
+       
+    @Basic
+    private String messageId;
+
+    @Basic
+    private String destination;
+
+    @Basic
+    private long exiration;
+
+    @Basic
+    private byte[] data;
+
+    public StoredMessage() {
+    }
+
+       public byte[] getData() {
+               return data;
+       }
+
+       public void setData(byte[] data) {
+               this.data = data;
+       }
+
+       public String getDestination() {
+               return destination;
+       }
+
+       public void setDestination(String destination) {
+               this.destination = destination;
+       }
+
+       public long getExiration() {
+               return exiration;
+       }
+
+       public void setExiration(long exiration) {
+               this.exiration = exiration;
+       }
+
+       public String getMessageId() {
+               return messageId;
+       }
+
+       public void setMessageId(String messageId) {
+               this.messageId = messageId;
+       }
+
+       public long getId() {
+               return id;
+       }
+
+       public void setId(long sequenceId) {
+               this.id = sequenceId;
+       }
+
+}

Added: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java?view=auto&rev=477273
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java
 Mon Nov 20 10:13:50 2006
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jpa.model;
+
+import javax.persistence.Basic;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+
+/** 
+ */
[EMAIL PROTECTED]
+public class StoredSubscription {
+               
+       /**
+     * Application identity class for Magazine.
+     */
+    public static class SubscriptionId {
+
+           public String destination;
+           public String clientId;
+           public String subscriptionName;
+
+        public boolean equals(Object other) {
+            if (other == this)
+                return true;
+            if (!(other instanceof SubscriptionId))
+                return false;
+    
+            SubscriptionId sid = (SubscriptionId) other;
+            return (destination == sid.destination || (destination != null && 
destination.equals(sid.destination)))
+                && (clientId == sid.clientId || (clientId != null && 
clientId.equals(sid.clientId)))
+                && (subscriptionName == sid.subscriptionName || 
(subscriptionName != null && subscriptionName.equals(sid.subscriptionName)));
+        }
+     
+        /**
+         * Hashcode must also depend on identity values.
+         */
+        public int hashCode() {
+            return ((destination == null) ? 0 : destination.hashCode())
+                ^ ((clientId == null) ? 0 : clientId.hashCode())
+                ^ ((subscriptionName == null) ? 0 : 
subscriptionName.hashCode())
+                ;
+        } 
+
+        public String toString() {
+            return destination + ":" + clientId + ":" + subscriptionName;
+        }
+
+               public String getClientId() {
+                       return clientId;
+               }
+
+               public void setClientId(String clientId) {
+                       this.clientId = clientId;
+               }
+
+               public String getDestination() {
+                       return destination;
+               }
+
+               public void setDestination(String destination) {
+                       this.destination = destination;
+               }
+
+               public String getSubscriptionName() {
+                       return subscriptionName;
+               }
+
+               public void setSubscriptionName(String subscriptionName) {
+                       this.subscriptionName = subscriptionName;
+               }
+    }
+
+    @Id
+    @GeneratedValue(strategy=GenerationType.AUTO) 
+    private long id;
+    
+    @Basic
+    private String destination;
+    @Basic
+    private String clientId;
+    @Basic
+    private String subscriptionName;
+    
+    @Basic
+    private long lastAckedId;
+    @Basic
+    private String selector;
+
+
+       public long getLastAckedId() {
+               return lastAckedId;
+       }
+
+       public void setLastAckedId(long lastAckedId) {
+               this.lastAckedId = lastAckedId;
+       }
+
+       public String getSelector() {
+               return selector;
+       }
+
+       public void setSelector(String selector) {
+               this.selector = selector;
+       }
+
+       public String getDestination() {
+               return destination;
+       }
+
+       public void setDestination(String destination) {
+               this.destination = destination;
+       }
+
+       public String getClientId() {
+               return clientId;
+       }
+
+       public void setClientId(String clientId) {
+               this.clientId = clientId;
+       }
+
+       public String getSubscriptionName() {
+               return subscriptionName;
+       }
+
+       public void setSubscriptionName(String subscriptionName) {
+               this.subscriptionName = subscriptionName;
+       }
+
+       public long getId() {
+               return id;
+       }
+
+       public void setId(long id) {
+               this.id = id;
+       }
+}

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java?view=diff&rev=477273&r1=477272&r2=477273
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java
 Mon Nov 20 10:13:50 2006
@@ -54,5 +54,14 @@
        public void setOffset(int offset) {
                this.offset = offset;
        }
+       
+       public void compact() {
+               if( length != data.length ) {
+                       byte t[] = new byte[length];
+                       System.arraycopy(data, offset, t, 0, length);
+                       data=t;
+                       offset=0;
+               }
+       }
 
 }

Added: 
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml?view=auto&rev=477273
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/persistence.xml
 Mon Nov 20 10:13:50 2006
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2006 The Apache Software Foundation.
+ 
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    version="1.0">
+    <persistence-unit name="activemq" transaction-type="RESOURCE_LOCAL">
+        
<provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+        <class>org.apache.activemq.store.jpa.model.StoredMessage</class>
+        <class>org.apache.activemq.store.jpa.model.StoredSubscription</class>
+        <!-- 
+        
<class>org.apache.activemq.store.jpa.model.StoredSubscription$SubscriptionId</class>
+         -->
+    </persistence-unit>
+</persistence>

Added: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java?view=auto&rev=477273
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java
 Mon Nov 20 10:13:50 2006
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.util.Properties;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RecoveryBrokerTest;
+import org.apache.activemq.store.jpa.JPAPersistenceAdapter;
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision$
+ */
+public class JPARecoveryBrokerTest extends RecoveryBrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setDeleteAllMessagesOnStartup(true);
+        JPAPersistenceAdapter pa = new JPAPersistenceAdapter();
+        Properties props = new Properties();
+        props.setProperty("openjpa.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver");
+        props.setProperty("openjpa.ConnectionURL", 
"jdbc:derby:activemq-data/derby;create=true");
+        props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        pa.setEntityManagerProperties(props);
+        service.setPersistenceAdapter(pa);
+        return service;
+        
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        JPAPersistenceAdapter pa = new JPAPersistenceAdapter();
+        Properties props = new Properties();
+        props.setProperty("openjpa.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver");
+        props.setProperty("openjpa.ConnectionURL", 
"jdbc:derby:activemq-data/derby;create=true");
+        props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+        props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        pa.setEntityManagerProperties(props);
+        service.setPersistenceAdapter(pa);
+        return service;
+    }
+    
+    public static Test suite() {
+        return suite(JPARecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Modified: incubator/activemq/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/pom.xml?view=diff&rev=477273&r1=477272&r2=477273
==============================================================================
--- incubator/activemq/trunk/pom.xml (original)
+++ incubator/activemq/trunk/pom.xml Mon Nov 20 10:13:50 2006
@@ -314,6 +314,12 @@
         <version>${commons-collections-version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.openjpa</groupId>
+        <artifactId>openjpa-persistence-jdbc</artifactId>
+        <version>${openjpa-version}</version>
+      </dependency>    
+
       <!-- Optional Spring Support -->
       <dependency>
         <groupId>org.springframework</groupId>
@@ -868,8 +874,9 @@
     <axis-version>1.2-RC1</axis-version>
     <cglib-version>2.0</cglib-version>
     <commons-beanutils-version>1.6.1</commons-beanutils-version>
-    <commons-collections-version>2.1</commons-collections-version>
-    <commons-dbcp-version>1.2</commons-dbcp-version>
+    <commons-collections-version>3.1</commons-collections-version>
+    <openjpa-version>0.9.6-incubating</openjpa-version>
+    <commons-dbcp-version>1.2.1</commons-dbcp-version>
     <commons-httpclient-version>2.0.1</commons-httpclient-version>
     <commons-logging-version>1.1</commons-logging-version>
     <commons-pool-version>1.2</commons-pool-version>
@@ -887,7 +894,7 @@
     <junit-version>3.8.1</junit-version>
     <jxta-version>2.0</jxta-version>
     <log4j-version>1.2.12</log4j-version>
-    <org-apache-derby-version>10.1.1.0</org-apache-derby-version>
+    <org-apache-derby-version>10.1.3.1</org-apache-derby-version>
     <org-apache-geronimo-specs-version>1.0</org-apache-geronimo-specs-version>
     
<org-apache-maven-surefire-plugin-version>2.2</org-apache-maven-surefire-plugin-version>
     <p2psockets-version>1.1.2</p2psockets-version>


Reply via email to