Author: gtully Date: Mon Feb 2 11:14:05 2009 New Revision: 739961 URL: http://svn.apache.org/viewvc?rev=739961&view=rev Log: resolve issue with broken ra tests, allow XA operation on non transacted session, check for transaction now is xa aware
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=739961&r1=739960&r2=739961&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Feb 2 11:14:05 2009 @@ -592,7 +592,7 @@ */ public void close() throws JMSException { if (!unconsumedMessages.isClosed()) { - if (session.isTransacted() && session.getTransactionContext().getTransactionId() != null) { + if (session.getTransactionContext().isInTransaction()) { session.getTransactionContext().addSynchronization(new Synchronization() { public void afterCommit() throws Exception { doClose(); @@ -667,7 +667,7 @@ // Do we have any acks we need to send out before closing? // Ack any delivered messages now. - if (!session.isTransacted()) { + if (!session.getTransacted()) { deliverAcks(); if (session.isDupsOkAcknowledge()) { acknowledge(); @@ -752,7 +752,7 @@ synchronized(deliveredMessages) { deliveredMessages.addFirst(md); } - if (session.isTransacted()) { + if (session.getTransacted()) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } @@ -766,7 +766,7 @@ ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } else { stats.onMessage(); - if (session.isTransacted()) { + if (session.getTransacted()) { // Do nothing. } else if (session.isAutoAcknowledge()) { synchronized (deliveredMessages) { @@ -830,7 +830,7 @@ // Don't acknowledge now, but we may need to let the broker know the // consumer got the message // to expand the pre-fetch window - if (session.isTransacted()) { + if (session.getTransacted()) { session.doStartTransaction(); if (!synchronizationRegistered) { synchronizationRegistered = true; @@ -892,7 +892,7 @@ if (ack == null) return; // no msgs - if (session.isTransacted()) { + if (session.getTransacted()) { session.doStartTransaction(); ack.setTransactionId(session.getTransactionContext().getTransactionId()); } @@ -903,7 +903,7 @@ deliveredCounter -= deliveredMessages.size(); additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); - if (!session.isTransacted()) { + if (!session.getTransacted()) { deliveredMessages.clear(); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=739961&r1=739960&r2=739961&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon Feb 2 11:14:05 2009 @@ -1955,7 +1955,7 @@ } protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { - if (lazy || connection.isSendAcksAsync() || isTransacted()) { + if (lazy || connection.isSendAcksAsync() || getTransacted()) { asyncSendPacket(ack); } else { syncSendPacket(ack); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=739961&r1=739960&r2=739961&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Mon Feb 2 11:14:05 2009 @@ -90,6 +90,10 @@ return transactionId != null && transactionId.isLocalTransaction(); } + public boolean isInTransaction() { + return transactionId != null; + } + /** * @return Returns the localTransactionEventListener. */ Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=739961&r1=739960&r2=739961&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java Mon Feb 2 11:14:05 2009 @@ -203,13 +203,14 @@ } session.close(); } + consumeMessage(connection, maxRetries + 1); } } // AMQ-1593 public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception { - final int numMessages = 1; + final int numMessages = 1; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0"); Connection connection = connectionFactory.createConnection(); @@ -233,37 +234,55 @@ } session.close(); } + + consumeMessage(connection, maxRetries + 1); } } + + private void consumeMessage(Connection connection, final int deliveryCount) + throws JMSException { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(1000); + assertNotNull(msg); + assertEquals("redelivery property matches deliveries", deliveryCount, msg.getLongProperty("JMSXDeliveryCount")); + session.commit(); + session.close(); + } + public void testRedeliveryPropertyWithNoRollback() throws Exception { + final int numMessages = 1; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); Connection connection = connectionFactory.createConnection(); connection.start(); - populateDestination(nbMessages, destinationName, connection); + populateDestination(numMessages, destinationName, connection); connection.close(); { AtomicInteger received = new AtomicInteger(); - while (received.get() < nbMessages) { + final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries(); + while (received.get() < maxRetries) { connection = connectionFactory.createConnection(); connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Destination destination = session.createQueue(destinationName); MessageConsumer consumer = session.createConsumer(destination); TextMessage msg = (TextMessage) consumer.receive(2000); if (msg != null) { - LOG.info("Received message " + msg.getText() + - " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); - assertFalse(msg.getJMSRedelivered()); - assertEquals(1, msg.getLongProperty("JMSXDeliveryCount")); + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount")); } session.close(); connection.close(); } + connection = connectionFactory.createConnection(); + connection.start(); + consumeMessage(connection, maxRetries + 1); } } Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java?rev=739961&r1=739960&r2=739961&view=diff ============================================================================== --- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java (original) +++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java Mon Feb 2 11:14:05 2009 @@ -47,15 +47,6 @@ private static long txGenerator; private Xid xid; - - // TODO fix for XA - public void testReceiveTwoThenCloseConnection() throws Exception {} - public void testReceiveRollback() throws Exception {} - public void testReceiveTwoThenRollback() throws Exception {} - public void testReceiveTwoThenRollbackManyTimes() throws Exception {} - public void testReceiveRollbackWithPrefetchOfOne() throws Exception {} - public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {} - @Override protected void setSessionTransacted() { resourceProvider.setTransacted(false);