Author: gtully
Date: Thu Dec 17 08:42:52 2009
New Revision: 891582
URL: http://svn.apache.org/viewvc?rev=891582&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2473 - issue with
jmstemplate were producers are closed before transaction commits which is fine
save when failover ocurrs. the producers need to be replayed to allow tracked
messages to be replayed. added the capability to track and relay transaction
producers. Can be disabled if producers out live transactions
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Thu Dec 17 08:42:52 2009
@@ -29,7 +29,6 @@
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
@@ -61,6 +60,7 @@
private boolean restoreProducers = true;
private boolean restoreTransaction = true;
private boolean trackMessages = true;
+ private boolean trackTransactionProducers = true;
private int maxCacheSize = 128 * 1024;
private int currentCacheSize;
private Map<MessageId,Message> messageCache = new
LinkedHashMap<MessageId,Message>(){
@@ -136,18 +136,31 @@
}
private void restoreTransactions(Transport transport, ConnectionState
connectionState) throws IOException {
- for (Iterator iter =
connectionState.getTransactionStates().iterator(); iter.hasNext();) {
- TransactionState transactionState = (TransactionState)iter.next();
+ for (TransactionState transactionState :
connectionState.getTransactionStates()) {
if (LOG.isDebugEnabled()) {
LOG.debug("tx: " + transactionState.getId());
}
- for (Iterator iterator =
transactionState.getCommands().iterator(); iterator.hasNext();) {
- Command command = (Command)iterator.next();
+
+ for (ProducerState producerState :
transactionState.getProducerStates().values()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("tx replay producer :" +
producerState.getInfo());
+ }
+ transport.oneway(producerState.getInfo());
+ }
+
+ for (Command command : transactionState.getCommands()) {
if (LOG.isDebugEnabled()) {
LOG.debug("tx replay: " + command);
}
transport.oneway(command);
}
+
+ for (ProducerState producerState :
transactionState.getProducerStates().values()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("tx remove replayed producer :" +
producerState.getInfo());
+ }
+
transport.oneway(producerState.getInfo().createRemoveCommand());
+ }
}
}
@@ -350,13 +363,22 @@
public Response processMessage(Message send) throws Exception {
if (send != null) {
if (trackTransactions && send.getTransactionId() != null) {
- ConnectionId connectionId =
send.getProducerId().getParentId().getParentId();
+ ProducerId producerId = send.getProducerId();
+ ConnectionId connectionId =
producerId.getParentId().getParentId();
if (connectionId != null) {
ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
TransactionState transactionState =
cs.getTransactionState(send.getTransactionId());
if (transactionState != null) {
transactionState.addCommand(send);
+
+ if (trackTransactionProducers) {
+ // for jmstemplate, track the producer in case
it is closed before commit
+ // and needs to be replayed
+ SessionState ss =
cs.getSessionState(producerId.getParentId());
+ ProducerState producerState =
ss.getProducerState(producerId);
+
producerState.setTransactionState(transactionState);
+ }
}
}
}
@@ -500,7 +522,15 @@
public void setTrackTransactions(boolean trackTransactions) {
this.trackTransactions = trackTransactions;
}
+
+ public boolean isTrackTransactionProducers() {
+ return this.trackTransactionProducers;
+ }
+ public void setTrackTransactionProducers(boolean
trackTransactionProducers) {
+ this.trackTransactionProducers = trackTransactionProducers;
+ }
+
public boolean isRestoreTransaction() {
return restoreTransaction;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
Thu Dec 17 08:42:52 2009
@@ -21,6 +21,7 @@
public class ProducerState {
final ProducerInfo info;
+ private TransactionState transactionState;
public ProducerState(ProducerInfo info) {
this.info = info;
@@ -34,4 +35,11 @@
return info;
}
+ public void setTransactionState(TransactionState transactionState) {
+ this.transactionState = transactionState;
+ }
+
+ public TransactionState getTransactionState() {
+ return transactionState;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
Thu Dec 17 08:42:52 2009
@@ -50,9 +50,16 @@
}
public ProducerState removeProducer(ProducerId id) {
- return producers.remove(id);
+ ProducerState producerState = producers.remove(id);
+ if (producerState != null) {
+ if (producerState.getTransactionState() != null) {
+ // allow the transaction to recreate dependent producer on
recovery
+
producerState.getTransactionState().addProducerState(producerState);
+ }
+ }
+ return producerState;
}
-
+
public void addConsumer(ConsumerInfo info) {
checkShutdown();
consumers.put(info.getConsumerId(), new ConsumerState(info));
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
Thu Dec 17 08:42:52 2009
@@ -18,9 +18,12 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId;
public class TransactionState {
@@ -30,6 +33,7 @@
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private boolean prepared;
private int preparedResult;
+ private final Map<ProducerId, ProducerState> producers = new
ConcurrentHashMap<ProducerId, ProducerState>();
public TransactionState(TransactionId id) {
this.id = id;
@@ -78,4 +82,14 @@
return preparedResult;
}
+ public void addProducerState(ProducerState producerState) {
+ if (producerState != null) {
+ producers.put(producerState.getInfo().getProducerId(),
producerState);
+ }
+ }
+
+ public Map<ProducerId, ProducerState> getProducerStates() {
+ return producers;
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Thu Dec 17 08:42:52 2009
@@ -95,6 +95,7 @@
private List<BackupTransport> backups=new
CopyOnWriteArrayList<BackupTransport>();
private int backupPoolSize=1;
private boolean trackMessages = false;
+ private boolean trackTransactionProducers = true;
private int maxCacheSize = 128 * 1024;
private TransportListener disposedListener = new
DefaultTransportListener() {};
@@ -233,6 +234,7 @@
started = true;
stateTracker.setMaxCacheSize(getMaxCacheSize());
stateTracker.setTrackMessages(isTrackMessages());
+
stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
if (connectedTransport.get() != null) {
stateTracker.restore(connectedTransport.get());
} else {
@@ -372,6 +374,14 @@
this.trackMessages = trackMessages;
}
+ public boolean isTrackTransactionProducers() {
+ return this.trackTransactionProducers;
+ }
+
+ public void setTrackTransactionProducers(boolean
trackTransactionProducers) {
+ this.trackTransactionProducers = trackTransactionProducers;
+ }
+
public int getMaxCacheSize() {
return maxCacheSize;
}
@@ -495,7 +505,7 @@
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Send oneway attempt: " + i + " failed
for command:" + command);
+ LOG.debug("Send oneway attempt: " + i + " failed
for command:" + command);
}
handleTransportFailure(e);
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=891582&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Thu Dec 17 08:42:52 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+// see https://issues.apache.org/activemq/browse/AMQ-2473
+public class FailoverTransactionTest {
+
+ private static final String QUEUE_NAME = "test.FailoverTransactionTest";
+ private String url = "tcp://localhost:61616";
+ BrokerService broker;
+
+ @Before
+ public void startCleanBroker() throws Exception {
+ startBroker(true);
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ public void startBroker(boolean deleteAllMessagesOnStartup) throws
Exception {
+ broker = new BrokerService();
+ broker.setUseJmx(false);
+ broker.addConnector(url);
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.start();
+ }
+
+ @Test
+ public void testFailoverProducerCloseBeforeTransaction() throws
Exception {
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ TextMessage message = session.createTextMessage("Test message");
+ producer.send(message);
+
+ // close producer before commit, emulate jmstemplate
+ producer.close();
+
+ // restart to force failover and connection state recovery
before the commit
+ broker.stop();
+ startBroker(false);
+
+ session.commit();
+ assertNotNull("we got the message", consumer.receive(20000));
+ session.commit();
+ connection.close();
+ }
+
+ @Test
+ public void
testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url +
")?trackTransactionProducers=false");
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ TextMessage message = session.createTextMessage("Test message");
+ producer.send(message);
+
+ // close producer before commit, emulate jmstemplate
+ producer.close();
+
+ // restart to force failover and connection state recovery before
the commit
+ broker.stop();
+ startBroker(false);
+
+ session.commit();
+
+ // withough tracking producers, message will not be replayed on
recovery
+ assertNull("we got the message", consumer.receive(2000));
+ session.commit();
+ connection.close();
+ }
+
+ @Test
+ public void testFailoverMultipleProducerCloseBeforeTransaction() throws
Exception {
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer;
+ TextMessage message;
+ final int count = 10;
+ for (int i=0; i<count; i++) {
+ producer = session.createProducer(destination);
+ message = session.createTextMessage("Test message: " + count);
+ producer.send(message);
+ producer.close();
+ }
+
+ // restart to force failover and connection state recovery before
the commit
+ broker.stop();
+ startBroker(false);
+
+ session.commit();
+ for (int i=0; i<count; i++) {
+ assertNotNull("we got all the message: " + count,
consumer.receive(20000));
+ }
+ session.commit();
+ connection.close();
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date