[ 
https://issues.apache.org/jira/browse/AMQ-3166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13917743#comment-13917743
 ] 

Arthur Naseef commented on AMQ-3166:
------------------------------------

I tried forcing a rollback on the transaction and that causes the commit to 
fail, but the exception seems misleading to me, stating transaction has not 
been started:

{noformat}
javax.jms.JMSException: Transaction 
'TX:ID:Arthur-Naseefs-MacBook-Pro.local-58321-1393821595757-5:1:1' has not been 
started.
{noformat}

Here's my patch:
{noformat}
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 b/activemq-broker/src/mai
index 65d044b..fc4a674 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -306,6 +306,9 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 responseRequired = false;
             }
 
+            // AMQ-3166: remember and propogate transaction failures.
+            serviceTransactionExceptions(command, e);
+
             if (responseRequired) {
                 response = new ExceptionResponse(e);
             } else {
@@ -1608,4 +1611,44 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     public WireFormatInfo getRemoteWireFormatInfo() {
         return wireFormatInfo;
     }
+
+    protected void  serviceTransactionExceptions(Command command, Throwable 
thrown) {
+        if ( command instanceof Message ) {
+            Message msg = (Message) command;
+
+            if ( msg.isInTransaction() ) {
+                LOG.debug("marking transaction {} failed on message {} due to 
{}", msg.getTransactionId(),
+                          msg.getMessageId(), thrown.getMessage());
+                
this.markTransactionFailure(msg.getConnection().getConnectionInfo().getConnectionId(),
+                                            msg.getTransactionId(), thrown);
+            }
+        } else if ( command instanceof MessageAck ) {
+            MessageAck ack = (MessageAck) command;
+
+            if ( ack.isInTransaction() ) {
+                LOG.debug("marking transaction {} failed on ack for messages 
{}..{} due to {}", ack.getTransactionId(),
+                          ack.getFirstMessageId(), ack.getLastMessageId(), 
thrown.getMessage());
+                
this.markTransactionFailure(ack.getConsumerId().getParentId().getParentId(), 
ack.getTransactionId(),
+                                            thrown);
+            }
+        }
+
+        // TBD: any other commands?  MessagePull doesn't contain transaction 
info, and I suspect that it doesn't need
+        // it since it must be synchronous.
+    }
+
+    protected void  markTransactionFailure(ConnectionId connId, TransactionId 
tId, Throwable thrown) {
+        TransactionInfo rollback = new TransactionInfo();
+        rollback.setConnectionId(connId);
+        rollback.setTransactionId(tId);
+        rollback.setType(TransactionInfo.ROLLBACK);
+
+        try {
+            // Reuse the rollback existing logic.
+            this.processRollbackTransaction(rollback);
+        } catch ( Throwable err ) {
+            LOG.warn("failed to force rollack on transaction {} after initial 
failure {}", tId, thrown.getMessage(),
+                     err);
+        }
+    }
 }
{noformat}

I'm looking at adding a "failed" status to TransactionState - that appears to 
be a feasible approach, and seems more correct as well.

> client calls to createProducer() and send() successful even though 
> BrokerFilter methods throw exceptions
> --------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3166
>                 URL: https://issues.apache.org/jira/browse/AMQ-3166
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, JMS client
>    Affects Versions: 5.4.2, 5.5.0
>            Reporter: Arthur Naseef
>            Assignee: Arthur Naseef
>         Attachments: AMQ3166Test.java, AMQ3166Test.java, 
> FailedTransactionTracking.java, FailedTransactionTrackingPlugin.java
>
>
> Client calls to createProducer() always return without an error even though a 
> BrokerFilter's addProducer() method throws an exception on the request. In 
> contrast, createConsumer() throws an exception, as expected, when 
> BrokerFilter's addConsumer() throws an exception.
> Clients using transacted sessions always return successfully from send() when 
> a BrokerFilter's send() method throws an exception.
> Below is a broker configuration file using <authorizationPlugin> to 
> illustrate the problem.
> To reproduce the problem With this configuration, a test client only needs to 
> connect with user = "user" and password = "password", and then attempt to 
> produce messages with a transacted session to any queue other than ABC (e.g. 
> DEF).
> Tracing the cause of the issue has lead to finding that the client code for 
> creating a producer uses an Async send for the producer information.  The 
> analogous code for consumers uses a Sync send.
> I will work on a patch.  It would be very helpful to have feedback on the 
> operation of the bus and the best way to resolve this problem.  Based on my 
> research, it seems that createProducer() should be using a Sync send in place 
> of the Async one.  Not yet sure about send().  Another possibility is to move 
> the security operations to earlier in the internal broker flow.
> === SAMPLE BROKER XML ===
> <beans
>   xmlns="http://www.springframework.org/schema/beans";
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>   xsi:schemaLocation="http://www.springframework.org/schema/beans 
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core 
> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>     <broker xmlns="http://activemq.apache.org/schema/core";
>             brokerName="localhost"
>             dataDirectory="${activemq.base}/data"
>             destroyApplicationContextOnStop="true" >
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.base}/data/kahadb"/>
>         </persistenceAdapter>
>         
>         <plugins>
>           <simpleAuthenticationPlugin anonymousAccessAllowed="true">
>               <users>
>                   <authenticationUser username="user" password="password"
>                       groups="users"/>
>               </users>
>           </simpleAuthenticationPlugin>
>           <authorizationPlugin>
>               <map>
>                   <authorizationMap>
>                     <authorizationEntries>
>                       <authorizationEntry queue="ABC" read="users" 
> write="users" admin="users" />
>                       <authorizationEntry topic="ActiveMQ.Advisory.>" 
> read="users" write="users" admin="users" />
>                     </authorizationEntries>
>                   </authorizationMap>
>               </map>
>           </authorizationPlugin>
>         </plugins>
>         <transportConnectors>
>             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
>         </transportConnectors>
>     </broker>
> </beans>



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to