Author: gtully
Date: Thu Dec 17 08:42:52 2009
New Revision: 891582

URL: http://svn.apache.org/viewvc?rev=891582&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2473 - issue with 
jmstemplate were producers are closed before transaction commits which is fine 
save when failover ocurrs. the producers need to be replayed to allow tracked 
messages to be replayed. added the capability to track and relay transaction 
producers. Can be disabled if producers out live transactions

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
 Thu Dec 17 08:42:52 2009
@@ -29,7 +29,6 @@
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
@@ -61,6 +60,7 @@
     private boolean restoreProducers = true;
     private boolean restoreTransaction = true;
     private boolean trackMessages = true;
+    private boolean trackTransactionProducers = true;
     private int maxCacheSize = 128 * 1024;
     private int currentCacheSize;
     private Map<MessageId,Message> messageCache = new 
LinkedHashMap<MessageId,Message>(){
@@ -136,18 +136,31 @@
     }
 
     private void restoreTransactions(Transport transport, ConnectionState 
connectionState) throws IOException {
-        for (Iterator iter = 
connectionState.getTransactionStates().iterator(); iter.hasNext();) {
-            TransactionState transactionState = (TransactionState)iter.next();
+        for (TransactionState transactionState : 
connectionState.getTransactionStates()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("tx: " + transactionState.getId());
             }
-            for (Iterator iterator = 
transactionState.getCommands().iterator(); iterator.hasNext();) {
-                Command command = (Command)iterator.next();
+            
+            for (ProducerState producerState : 
transactionState.getProducerStates().values()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("tx replay producer :" + 
producerState.getInfo());
+                }
+                transport.oneway(producerState.getInfo());
+            }
+            
+            for (Command command : transactionState.getCommands()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("tx replay: " + command);
                 }
                 transport.oneway(command);
             }
+            
+            for (ProducerState producerState : 
transactionState.getProducerStates().values()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("tx remove replayed producer :" + 
producerState.getInfo());
+                }
+                
transport.oneway(producerState.getInfo().createRemoveCommand());
+            }
         }
     }
 
@@ -350,13 +363,22 @@
     public Response processMessage(Message send) throws Exception {
         if (send != null) {
             if (trackTransactions && send.getTransactionId() != null) {
-                ConnectionId connectionId = 
send.getProducerId().getParentId().getParentId();
+                ProducerId producerId = send.getProducerId();
+                ConnectionId connectionId = 
producerId.getParentId().getParentId();
                 if (connectionId != null) {
                     ConnectionState cs = connectionStates.get(connectionId);
                     if (cs != null) {
                         TransactionState transactionState = 
cs.getTransactionState(send.getTransactionId());
                         if (transactionState != null) {
                             transactionState.addCommand(send);
+                            
+                            if (trackTransactionProducers) {
+                                // for jmstemplate, track the producer in case 
it is closed before commit
+                                // and needs to be replayed
+                                SessionState ss = 
cs.getSessionState(producerId.getParentId());
+                                ProducerState producerState = 
ss.getProducerState(producerId);
+                                
producerState.setTransactionState(transactionState);            
+                            }
                         }
                     }
                 }
@@ -500,7 +522,15 @@
     public void setTrackTransactions(boolean trackTransactions) {
         this.trackTransactions = trackTransactions;
     }
+    
+    public boolean isTrackTransactionProducers() {
+        return this.trackTransactionProducers;
+    }
 
+    public void setTrackTransactionProducers(boolean 
trackTransactionProducers) {
+        this.trackTransactionProducers = trackTransactionProducers;
+    }
+    
     public boolean isRestoreTransaction() {
         return restoreTransaction;
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
 Thu Dec 17 08:42:52 2009
@@ -21,6 +21,7 @@
 
 public class ProducerState {
     final ProducerInfo info;
+    private TransactionState transactionState;
 
     public ProducerState(ProducerInfo info) {
         this.info = info;
@@ -34,4 +35,11 @@
         return info;
     }
 
+    public void setTransactionState(TransactionState transactionState) {
+        this.transactionState = transactionState;
+    }
+
+    public TransactionState getTransactionState() {
+        return transactionState;
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
 Thu Dec 17 08:42:52 2009
@@ -50,9 +50,16 @@
     }
 
     public ProducerState removeProducer(ProducerId id) {
-        return producers.remove(id);
+        ProducerState producerState = producers.remove(id);
+        if (producerState != null) {
+            if (producerState.getTransactionState() != null) {
+                // allow the transaction to recreate dependent producer on 
recovery
+                
producerState.getTransactionState().addProducerState(producerState);
+            }
+        }
+        return producerState;
     }
-
+    
     public void addConsumer(ConsumerInfo info) {
         checkShutdown();
         consumers.put(info.getConsumerId(), new ConsumerState(info));

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
 Thu Dec 17 08:42:52 2009
@@ -18,9 +18,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.TransactionId;
 
 public class TransactionState {
@@ -30,6 +33,7 @@
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
     private boolean prepared;
     private int preparedResult;
+    private final Map<ProducerId, ProducerState> producers = new 
ConcurrentHashMap<ProducerId, ProducerState>();
 
     public TransactionState(TransactionId id) {
         this.id = id;
@@ -78,4 +82,14 @@
         return preparedResult;
     }
 
+    public void addProducerState(ProducerState producerState) {
+        if (producerState != null) {
+            producers.put(producerState.getInfo().getProducerId(), 
producerState);
+        }
+    }
+
+    public Map<ProducerId, ProducerState> getProducerStates() {
+        return producers;
+    }
+
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 Thu Dec 17 08:42:52 2009
@@ -95,6 +95,7 @@
     private List<BackupTransport> backups=new 
CopyOnWriteArrayList<BackupTransport>();
     private int backupPoolSize=1;
     private boolean trackMessages = false;
+    private boolean trackTransactionProducers = true;
     private int maxCacheSize = 128 * 1024;
     private TransportListener disposedListener = new 
DefaultTransportListener() {};
     
@@ -233,6 +234,7 @@
             started = true;
             stateTracker.setMaxCacheSize(getMaxCacheSize());
             stateTracker.setTrackMessages(isTrackMessages());
+            
stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
             if (connectedTransport.get() != null) {
                 stateTracker.restore(connectedTransport.get());
             } else {
@@ -372,6 +374,14 @@
         this.trackMessages = trackMessages;
     }
 
+    public boolean isTrackTransactionProducers() {
+        return this.trackTransactionProducers;
+    }
+
+    public void setTrackTransactionProducers(boolean 
trackTransactionProducers) {
+        this.trackTransactionProducers = trackTransactionProducers;
+    }
+
     public int getMaxCacheSize() {
         return maxCacheSize;
     }
@@ -495,7 +505,7 @@
 
                     } catch (IOException e) {
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Send oneway attempt: " + i + " failed 
for command:" + command);   
+                            LOG.debug("Send oneway attempt: " + i + " failed 
for command:" + command);
                         }
                         handleTransportFailure(e);
                     }

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=891582&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
 Thu Dec 17 08:42:52 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+// see https://issues.apache.org/activemq/browse/AMQ-2473
+public class FailoverTransactionTest {
+       
+       private static final String QUEUE_NAME = "test.FailoverTransactionTest";
+       private String url = "tcp://localhost:61616";
+       BrokerService broker;
+       
+       @Before
+       public void startCleanBroker() throws Exception {
+           startBroker(true);
+       }
+       
+       @After
+       public void stopBroker() throws Exception {
+           if (broker != null) {
+               broker.stop();
+           }
+       }
+       
+       public void startBroker(boolean deleteAllMessagesOnStartup) throws 
Exception {
+           broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.addConnector(url);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+       }
+       
+       @Test
+       public void testFailoverProducerCloseBeforeTransaction() throws 
Exception {
+               
+               ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("failover:(" + url + ")");
+               Connection connection = cf.createConnection();
+               connection.start();
+               Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+               Queue destination = session.createQueue(QUEUE_NAME);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+               MessageProducer producer = session.createProducer(destination);
+               
+               TextMessage message = session.createTextMessage("Test message");
+               producer.send(message);
+
+               // close producer before commit, emulate jmstemplate
+               producer.close();
+               
+               // restart to force failover and connection state recovery 
before the commit
+               broker.stop();
+               startBroker(false);
+
+               session.commit();
+               assertNotNull("we got the message", consumer.receive(20000));
+               session.commit();       
+               connection.close();
+       }
+       
+       @Test
+       public void 
testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
+               
+           ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("failover:(" + url + 
")?trackTransactionProducers=false");
+           Connection connection = cf.createConnection();
+           connection.start();
+           Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+           Queue destination = session.createQueue(QUEUE_NAME);
+           
+           MessageConsumer consumer = session.createConsumer(destination);
+           MessageProducer producer = session.createProducer(destination);
+           
+           TextMessage message = session.createTextMessage("Test message");
+           producer.send(message);
+           
+           // close producer before commit, emulate jmstemplate
+           producer.close();
+           
+           // restart to force failover and connection state recovery before 
the commit
+           broker.stop();
+           startBroker(false);
+           
+           session.commit();
+           
+           // withough tracking producers, message will not be replayed on 
recovery
+           assertNull("we got the message", consumer.receive(2000));
+           session.commit();   
+           connection.close();
+       }
+       
+       @Test
+       public void testFailoverMultipleProducerCloseBeforeTransaction() throws 
Exception {
+               
+           ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("failover:(" + url + ")");
+           Connection connection = cf.createConnection();
+           connection.start();
+           Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+           Queue destination = session.createQueue(QUEUE_NAME);
+           
+           MessageConsumer consumer = session.createConsumer(destination);
+           MessageProducer producer;
+           TextMessage message;
+           final int count = 10;
+           for (int i=0; i<count; i++) {
+               producer = session.createProducer(destination);         
+               message = session.createTextMessage("Test message: " + count);
+               producer.send(message);
+               producer.close();
+           }
+           
+           // restart to force failover and connection state recovery before 
the commit
+           broker.stop();
+           startBroker(false);
+           
+           session.commit();
+           for (int i=0; i<count; i++) {
+               assertNotNull("we got all the message: " + count, 
consumer.receive(20000));
+           }
+           session.commit();
+           connection.close();
+       }  
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to