[
https://issues.apache.org/jira/browse/AMQ-3166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13917743#comment-13917743
]
Arthur Naseef commented on AMQ-3166:
------------------------------------
I tried forcing a rollback on the transaction and that causes the commit to
fail, but the exception seems misleading to me, stating transaction has not
been started:
{noformat}
javax.jms.JMSException: Transaction
'TX:ID:Arthur-Naseefs-MacBook-Pro.local-58321-1393821595757-5:1:1' has not been
started.
{noformat}
Here's my patch:
{noformat}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/mai
index 65d044b..fc4a674 100755
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -306,6 +306,9 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
responseRequired = false;
}
+ // AMQ-3166: remember and propogate transaction failures.
+ serviceTransactionExceptions(command, e);
+
if (responseRequired) {
response = new ExceptionResponse(e);
} else {
@@ -1608,4 +1611,44 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
public WireFormatInfo getRemoteWireFormatInfo() {
return wireFormatInfo;
}
+
+ protected void serviceTransactionExceptions(Command command, Throwable
thrown) {
+ if ( command instanceof Message ) {
+ Message msg = (Message) command;
+
+ if ( msg.isInTransaction() ) {
+ LOG.debug("marking transaction {} failed on message {} due to
{}", msg.getTransactionId(),
+ msg.getMessageId(), thrown.getMessage());
+
this.markTransactionFailure(msg.getConnection().getConnectionInfo().getConnectionId(),
+ msg.getTransactionId(), thrown);
+ }
+ } else if ( command instanceof MessageAck ) {
+ MessageAck ack = (MessageAck) command;
+
+ if ( ack.isInTransaction() ) {
+ LOG.debug("marking transaction {} failed on ack for messages
{}..{} due to {}", ack.getTransactionId(),
+ ack.getFirstMessageId(), ack.getLastMessageId(),
thrown.getMessage());
+
this.markTransactionFailure(ack.getConsumerId().getParentId().getParentId(),
ack.getTransactionId(),
+ thrown);
+ }
+ }
+
+ // TBD: any other commands? MessagePull doesn't contain transaction
info, and I suspect that it doesn't need
+ // it since it must be synchronous.
+ }
+
+ protected void markTransactionFailure(ConnectionId connId, TransactionId
tId, Throwable thrown) {
+ TransactionInfo rollback = new TransactionInfo();
+ rollback.setConnectionId(connId);
+ rollback.setTransactionId(tId);
+ rollback.setType(TransactionInfo.ROLLBACK);
+
+ try {
+ // Reuse the rollback existing logic.
+ this.processRollbackTransaction(rollback);
+ } catch ( Throwable err ) {
+ LOG.warn("failed to force rollack on transaction {} after initial
failure {}", tId, thrown.getMessage(),
+ err);
+ }
+ }
}
{noformat}
I'm looking at adding a "failed" status to TransactionState - that appears to
be a feasible approach, and seems more correct as well.
> client calls to createProducer() and send() successful even though
> BrokerFilter methods throw exceptions
> --------------------------------------------------------------------------------------------------------
>
> Key: AMQ-3166
> URL: https://issues.apache.org/jira/browse/AMQ-3166
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker, JMS client
> Affects Versions: 5.4.2, 5.5.0
> Reporter: Arthur Naseef
> Assignee: Arthur Naseef
> Attachments: AMQ3166Test.java, AMQ3166Test.java,
> FailedTransactionTracking.java, FailedTransactionTrackingPlugin.java
>
>
> Client calls to createProducer() always return without an error even though a
> BrokerFilter's addProducer() method throws an exception on the request. In
> contrast, createConsumer() throws an exception, as expected, when
> BrokerFilter's addConsumer() throws an exception.
> Clients using transacted sessions always return successfully from send() when
> a BrokerFilter's send() method throws an exception.
> Below is a broker configuration file using <authorizationPlugin> to
> illustrate the problem.
> To reproduce the problem With this configuration, a test client only needs to
> connect with user = "user" and password = "password", and then attempt to
> produce messages with a transacted session to any queue other than ABC (e.g.
> DEF).
> Tracing the cause of the issue has lead to finding that the client code for
> creating a producer uses an Async send for the producer information. The
> analogous code for consumers uses a Sync send.
> I will work on a patch. It would be very helpful to have feedback on the
> operation of the bus and the best way to resolve this problem. Based on my
> research, it seems that createProducer() should be using a Sync send in place
> of the Async one. Not yet sure about send(). Another possibility is to move
> the security operations to earlier in the internal broker flow.
> === SAMPLE BROKER XML ===
> <beans
> xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
> http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
> <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost"
> dataDirectory="${activemq.base}/data"
> destroyApplicationContextOnStop="true" >
> <persistenceAdapter>
> <kahaDB directory="${activemq.base}/data/kahadb"/>
> </persistenceAdapter>
>
> <plugins>
> <simpleAuthenticationPlugin anonymousAccessAllowed="true">
> <users>
> <authenticationUser username="user" password="password"
> groups="users"/>
> </users>
> </simpleAuthenticationPlugin>
> <authorizationPlugin>
> <map>
> <authorizationMap>
> <authorizationEntries>
> <authorizationEntry queue="ABC" read="users"
> write="users" admin="users" />
> <authorizationEntry topic="ActiveMQ.Advisory.>"
> read="users" write="users" admin="users" />
> </authorizationEntries>
> </authorizationMap>
> </map>
> </authorizationPlugin>
> </plugins>
> <transportConnectors>
> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
> </transportConnectors>
> </broker>
> </beans>
--
This message was sent by Atlassian JIRA
(v6.2#6252)