Author: gtully
Date: Fri Jun 22 20:43:24 2012
New Revision: 1353024
URL: http://svn.apache.org/viewvc?rev=1353024&view=rev
Log:
fix up destination statistics for recovered transactions, pending adds are not
visible, but pending acks are still accounted for in the messages count,
commit/rollback updates enqueues/dequeues/messages as expected -
https://issues.apache.org/jira/browse/AMQ-3872,
https://issues.apache.org/jira/browse/AMQ-3305 - both kahadb and jdbc suffered
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
Fri Jun 22 20:43:24 2012
@@ -30,7 +30,6 @@ import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.ConnectionInfo;
@@ -139,19 +138,28 @@ public class TransactionBroker extends B
private void registerSync(Destination destination, Transaction
transaction, BaseCommand command) {
Synchronization sync = new PreparedDestinationCompletion(destination,
command.isMessage());
// ensure one per destination in the list
- transaction.removeSynchronization(sync);
- transaction.addSynchronization(sync);
+ Synchronization existing = transaction.findMatching(sync);
+ if (existing != null) {
+ ((PreparedDestinationCompletion)existing).incrementOpCount();
+ } else {
+ transaction.addSynchronization(sync);
+ }
}
static class PreparedDestinationCompletion extends Synchronization {
final Destination destination;
final boolean messageSend;
+ int opCount = 1;
public PreparedDestinationCompletion(final Destination destination,
boolean messageSend) {
this.destination = destination;
// rollback relevant to acks, commit to sends
this.messageSend = messageSend;
}
+ public void incrementOpCount() {
+ opCount++;
+ }
+
@Override
public int hashCode() {
return System.identityHashCode(destination) +
@@ -179,9 +187,14 @@ public class TransactionBroker extends B
public void afterCommit() throws Exception {
if (messageSend) {
destination.clearPendingMessages();
+
destination.getDestinationStatistics().getEnqueues().add(opCount);
+
destination.getDestinationStatistics().getMessages().add(opCount);
if (LOG.isDebugEnabled()) {
LOG.debug("cleared pending from afterCommit : " +
destination);
}
+ } else {
+
destination.getDestinationStatistics().getDequeues().add(opCount);
+
destination.getDestinationStatistics().getMessages().subtract(opCount);
}
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
Fri Jun 22 20:43:24 2012
@@ -17,16 +17,10 @@
package org.apache.activemq.store.jdbc;
import java.io.IOException;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -37,11 +31,9 @@ import org.apache.activemq.store.Message
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
-import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.SubscriptionKey;
/**
* respect 2pc prepare
@@ -305,6 +297,11 @@ public class JdbcMemoryTransactionStore
JDBCTopicMessageStore jdbcTopicMessageStore =
(JDBCTopicMessageStore)
topicStores.get(lastAckCommand.getMessageAck().getDestination());
jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(),
lastAckCommand.getSubName(), lastAckCommand.getSequence(),
lastAckCommand.getPriority());
lastAckCommand.setMessageStore(jdbcTopicMessageStore);
+ } else {
+ // when reading the store we ignore messages with non null
XIDs but should include those with XIDS starting in - (pending acks in an xa
transaction),
+ // but the sql is non portable to match BLOB with LIKE etc
+ // so we make up for it when we recover the ack
+
((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
}
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Fri Jun 22 20:43:24 2012
@@ -354,6 +354,7 @@ public class DefaultJDBCAdapter implemen
s.setLong(1, seq);
} else {
byte[] xidVal = xid.getEncodedXidBytes();
+ xidVal[0] = '-';
setBinaryData(s, 1, xidVal);
s.setLong(2, seq);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
Fri Jun 22 20:43:24 2012
@@ -71,6 +71,14 @@ public abstract class Transaction {
}
}
+ public Synchronization findMatching(Synchronization r) {
+ int existing = synchronizations.indexOf(r);
+ if (existing != -1) {
+ return synchronizations.get(existing);
+ }
+ return null;
+ }
+
public void removeSynchronization(Synchronization r) {
synchronizations.remove(r);
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
Fri Jun 22 20:43:24 2012
@@ -108,11 +108,13 @@ public class XARecoveryBrokerTest extend
assertEquals("enqueue count does not see prepared", 0,
destinationView.getQueueSize());
TransactionId first = (TransactionId)dar.getData()[0];
+ int commitCount = 0;
// via jmx, force outcome
for (int i = 0; i < 4; i++) {
RecoveredXATransactionViewMBean mbean =
getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]);
if (i%2==0) {
mbean.heuristicCommit();
+ commitCount++;
} else {
mbean.heuristicRollback();
}
@@ -124,6 +126,9 @@ public class XARecoveryBrokerTest extend
dar = (DataArrayResponse)response;
assertEquals(0, dar.getData().length);
+ // verify messages available
+ assertEquals("enqueue count reflects outcome", commitCount,
destinationView.getQueueSize());
+
// verify mbeans gone
try {
RecoveredXATransactionViewMBean gone =
getProxyToPreparedTransactionViewMBean(first);
@@ -547,11 +552,20 @@ public class XARecoveryBrokerTest extend
assertNull(m);
assertNoMessagesLeft(connection);
+ // validate destination depth via jmx
+ DestinationViewMBean destinationView =
getProxyToDestination(destinationList(destination)[0]);
+ assertEquals("enqueue count does not see prepared acks", 4,
destinationView.getQueueSize());
+ assertEquals("enqueue count does not see prepared acks", 0,
destinationView.getDequeueCount());
+
connection.request(createCommitTransaction2Phase(connectionInfo,
txid));
// validate recovery complete
dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there are no prepared tx", 0,
dataArrayResponse.getData().length);
+
+ assertEquals("enqueue count does not see commited acks", 0,
destinationView.getQueueSize());
+ assertEquals("enqueue count does not see commited acks", 4,
destinationView.getDequeueCount());
+
}
public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart()
{