Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x 4e627d28f -> 29fbeb511


KahaDB Recovery can experience a dangling transaction when prepare and commit 
occur on different pagefiles.

Signed-off-by: gtully <[email protected]>
(cherry picked from commit 99db9ef301c8e8a7156d5c5a71875958f412792a)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cbe486fb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cbe486fb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cbe486fb

Branch: refs/heads/activemq-5.15.x
Commit: cbe486fb9d1c4e87a72e6b3c9eea627667eebbad
Parents: 4e627d2
Author: jgoodyear <[email protected]>
Authored: Fri Oct 5 14:04:21 2018 -0230
Committer: Christopher L. Shannon (cshannon) <[email protected]>
Committed: Wed Oct 10 10:23:16 2018 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  11 +
 .../org/apache/activemq/bugs/AMQ7067Test.java   | 360 +++++++++++++++++++
 .../apache/activemq/bugs/amq7067/activemq.xml   | 111 ++++++
 3 files changed, 482 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cbe486fb/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8676ac2..82c4865 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1408,6 +1408,9 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         } finally {
             indexLock.writeLock().unlock();
         }
+        for (Operation op: inflightTx) {
+            recordAckMessageReferenceLocation(location, op.getLocation());
+        }
     }
 
     @SuppressWarnings("rawtypes")
@@ -1417,6 +1420,9 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
             List<Operation> tx = inflightTransactions.remove(key);
             if (tx != null) {
                 preparedTransactions.put(key, tx);
+                for (Operation op: tx) {
+                    recordAckMessageReferenceLocation(location, 
op.getLocation());
+                }
             }
         }
     }
@@ -1431,6 +1437,11 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                 updates = preparedTransactions.remove(key);
             }
         }
+        if (updates != null) {
+            for(Operation op : updates) {
+                recordAckMessageReferenceLocation(location, op.getLocation());
+            }
+        }
     }
 
     protected void process(KahaRewrittenDataFileCommand command, Location 
location)  throws IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cbe486fb/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
new file mode 100644
index 0000000..c1f34d0
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
@@ -0,0 +1,360 @@
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQXAConnection;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.util.JMXSupport;
+import org.apache.commons.lang.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.util.Random;
+
+import static javax.transaction.xa.XAResource.*;
+import static org.junit.Assert.assertEquals;
+
+public class AMQ7067Test {
+
+    protected static Random r = new Random();
+    final static String WIRE_LEVEL_ENDPOINT = "tcp://localhost:61616";
+    protected BrokerService broker;
+    protected ActiveMQXAConnection connection;
+    protected XASession xaSession;
+    protected XAResource xaRes;
+    private final String xbean = "xbean:";
+    private final String confBase = 
"src/test/resources/org/apache/activemq/bugs/amq7067";
+
+
+    private static final ActiveMQXAConnectionFactory 
ACTIVE_MQ_CONNECTION_FACTORY;
+    private static final ActiveMQConnectionFactory 
ACTIVE_MQ_NON_XA_CONNECTION_FACTORY;
+
+    static {
+        ACTIVE_MQ_CONNECTION_FACTORY = new 
ActiveMQXAConnectionFactory(WIRE_LEVEL_ENDPOINT);
+        ACTIVE_MQ_NON_XA_CONNECTION_FACTORY = new 
ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        deleteData(new File("target/data"));
+        createBroker();
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        broker.stop();
+    }
+
+    public void setupXAConnection() throws Exception {
+        connection = (ActiveMQXAConnection) 
ACTIVE_MQ_CONNECTION_FACTORY.createXAConnection();
+        connection.start();
+        xaSession = connection.createXASession();
+        xaRes = xaSession.getXAResource();
+    }
+
+    private void createBroker() throws Exception {
+        broker = new BrokerService();
+        broker = BrokerFactory.createBroker(xbean + confBase + 
"/activemq.xml");
+        broker.start();
+    }
+
+
+    @Test
+    public void testAMQ7067XAcommit() throws Exception {
+
+        PersistenceAdapterViewMBean kahadbView = 
getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
+        setupXAConnection();
+
+        Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
+        createDanglingTransaction(xaRes, xaSession, holdKahaDb);
+
+        MessageProducer holdKahaDbProducer = 
xaSession.createProducer(holdKahaDb);
+
+        XATransactionId txid = createXATransaction();
+        System.out.println("****** create new txid = " + txid);
+        xaRes.start(txid, TMNOFLAGS);
+
+        TextMessage helloMessage = 
xaSession.createTextMessage(StringUtils.repeat("a", 10));
+        holdKahaDbProducer.send(helloMessage);
+        xaRes.end(txid, TMSUCCESS);
+        xaRes.prepare(txid);
+
+        Queue queue = xaSession.createQueue("test");
+
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+        xaRes.commit(txid, false);
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+
+        ((org.apache.activemq.broker.region.Queue) 
broker.getRegionBroker().getDestinationMap().get(queue)).purge();
+
+        Xid[] xids = xaRes.recover(TMSTARTRSCAN);
+
+        //Should be 1 since we have only 1 prepared
+        assertEquals(1, xids.length);
+        connection.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+
+        setupXAConnection();
+        xids = xaRes.recover(TMSTARTRSCAN);
+
+        // THIS SHOULD NOT FAIL AS THERE SHOUL DBE ONLY 1 TRANSACTION!
+        assertEquals(1, xids.length);
+
+    }
+
+    @Test
+    public void testAMQ7067XArollback() throws Exception {
+
+        PersistenceAdapterViewMBean kahadbView = 
getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
+        setupXAConnection();
+
+        Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
+        createDanglingTransaction(xaRes, xaSession, holdKahaDb);
+
+        MessageProducer holdKahaDbProducer = 
xaSession.createProducer(holdKahaDb);
+
+        XATransactionId txid = createXATransaction();
+        System.out.println("****** create new txid = " + txid);
+        xaRes.start(txid, TMNOFLAGS);
+
+        TextMessage helloMessage = 
xaSession.createTextMessage(StringUtils.repeat("a", 10));
+        holdKahaDbProducer.send(helloMessage);
+        xaRes.end(txid, TMSUCCESS);
+        xaRes.prepare(txid);
+
+        Queue queue = xaSession.createQueue("test");
+
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+        xaRes.rollback(txid);
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+
+        ((org.apache.activemq.broker.region.Queue) 
broker.getRegionBroker().getDestinationMap().get(queue)).purge();
+
+        Xid[] xids = xaRes.recover(TMSTARTRSCAN);
+
+        //Should be 1 since we have only 1 prepared
+        assertEquals(1, xids.length);
+        connection.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+
+        setupXAConnection();
+        xids = xaRes.recover(TMSTARTRSCAN);
+
+        // THIS SHOULD NOT FAIL AS THERE SHOULD BE ONLY 1 TRANSACTION!
+        assertEquals(1, xids.length);
+
+    }
+
+    @Test
+    public void testAMQ7067commit() throws Exception {
+        final Connection connection = 
ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        Queue holdKahaDb = session.createQueue("holdKahaDb");
+        MessageProducer holdKahaDbProducer = 
session.createProducer(holdKahaDb);
+        TextMessage helloMessage = 
session.createTextMessage(StringUtils.repeat("a", 10));
+        holdKahaDbProducer.send(helloMessage);
+        Queue queue = session.createQueue("test");
+        produce(connection, queue, 100, 512*1024);
+        session.commit();
+        produce(connection, queue, 100, 512*1024);
+
+        System.out.println(String.format("QueueSize %s: %d", 
holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
+        purgeQueue(queue.getQueueName());
+        Thread.sleep(10000);
+
+        curruptIndexFile(getDataDirectory());
+
+
+        while(true) {
+            try {
+                Thread.sleep(10000);
+                System.out.println(String.format("QueueSize %s: %d", 
holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
+                break;
+            } catch (Exception ex) {
+                System.out.println(ex.getMessage());
+                break;
+            }
+        }
+
+        connection.close();
+
+        // THIS SHOULD NOT FAIL AS THERE SHOULD BE ONLY 1 TRANSACTION!
+        assertEquals(1, getQueueSize(holdKahaDb.getQueueName()));
+    }
+
+    @Test
+    public void testAMQ7067rollback() throws Exception {
+        final Connection connection = 
ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        Queue holdKahaDb = session.createQueue("holdKahaDb");
+        MessageProducer holdKahaDbProducer = 
session.createProducer(holdKahaDb);
+        TextMessage helloMessage = 
session.createTextMessage(StringUtils.repeat("a", 10));
+        holdKahaDbProducer.send(helloMessage);
+        Queue queue = session.createQueue("test");
+        produce(connection, queue, 100, 512*1024);
+        session.rollback();
+        produce(connection, queue, 100, 512*1024);
+
+        System.out.println(String.format("QueueSize %s: %d", 
holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
+        purgeQueue(queue.getQueueName());
+        Thread.sleep(10000);
+
+        curruptIndexFile(getDataDirectory());
+
+
+        while(true) {
+            try {
+                Thread.sleep(10000);
+                System.out.println(String.format("QueueSize %s: %d", 
holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
+                break;
+            } catch (Exception ex) {
+                System.out.println(ex.getMessage());
+                break;
+            }
+        }
+
+        connection.close();
+
+        // THIS SHOULD NOT FAIL AS THERE SHOULD ZERO TRANSACTION!
+        assertEquals(0, getQueueSize(holdKahaDb.getQueueName()));
+    }
+
+    protected static void createDanglingTransaction(XAResource xaRes, 
XASession xaSession, Queue queue) throws JMSException, IOException, XAException 
{
+        MessageProducer producer = xaSession.createProducer(queue);
+        XATransactionId txId = createXATransaction();
+        xaRes.start(txId, TMNOFLAGS);
+
+        TextMessage helloMessage = 
xaSession.createTextMessage(StringUtils.repeat("dangler", 10));
+        producer.send(helloMessage);
+        xaRes.end(txId, TMSUCCESS);
+        xaRes.prepare(txId);
+        System.out.println("****** createDanglingTransaction txId = " + txId);
+    }
+
+    protected static void produce(XAResource xaRes, XASession xaSession, Queue 
queue, int messageCount, int messageSize) throws JMSException, IOException, 
XAException {
+        MessageProducer producer = xaSession.createProducer(queue);
+
+        for (int i = 0; i < messageCount; i++) {
+            XATransactionId txid = createXATransaction();
+            xaRes.start(txid, TMNOFLAGS);
+
+            TextMessage helloMessage = 
xaSession.createTextMessage(StringUtils.repeat("a", messageSize));
+            producer.send(helloMessage);
+            xaRes.end(txid, TMSUCCESS);
+            xaRes.prepare(txid);
+            xaRes.commit(txid, true);
+        }
+    }
+
+    protected static void produce(Connection connection, Queue queue, int 
messageCount, int messageSize) throws JMSException, IOException, XAException {
+        Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(queue);
+
+        for (int i = 0; i < messageCount; i++) {
+            TextMessage helloMessage = 
session.createTextMessage(StringUtils.repeat("a", messageSize));
+            producer.send(helloMessage);
+            session.commit();
+
+        }
+    }
+
+    protected static XATransactionId createXATransaction() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(r.nextInt());
+        os.close();
+        byte[] bs = baos.toByteArray();
+
+        XATransactionId xid = new XATransactionId();
+        xid.setBranchQualifier(bs);
+        xid.setGlobalTransactionId(bs);
+        xid.setFormatId(55);
+        return xid;
+    }
+
+    private RecoveredXATransactionViewMBean 
getProxyToPreparedTransactionViewMBean(TransactionId xid) throws 
MalformedObjectNameException, JMSException {
+
+        ObjectName objectName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid="
 +
+                JMXSupport.encodeObjectNamePart(xid.toString()));
+        RecoveredXATransactionViewMBean proxy = 
(RecoveredXATransactionViewMBean) 
broker.getManagementContext().newProxyInstance(objectName,
+                RecoveredXATransactionViewMBean.class, true);
+        return proxy;
+    }
+
+    private PersistenceAdapterViewMBean getProxyToPersistenceAdapter(String 
name) throws MalformedObjectNameException, JMSException {
+        return (PersistenceAdapterViewMBean) 
broker.getManagementContext().newProxyInstance(
+                
BrokerMBeanSupport.createPersistenceAdapterName(broker.getBrokerObjectName().toString(),
 name),
+                PersistenceAdapterViewMBean.class, true);
+    }
+
+    private void deleteData(File file) throws Exception {
+        String[] entries = file.list();
+        if (entries == null) return;
+        for (String s : entries) {
+            File currentFile = new File(file.getPath(), s);
+            if (currentFile.isDirectory()) {
+                deleteData(currentFile);
+            }
+            currentFile.delete();
+        }
+        file.delete();
+    }
+
+    private long getQueueSize(final String queueName) throws 
MalformedObjectNameException {
+        ObjectName objectName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
 + JMXSupport.encodeObjectNamePart(queueName));
+        DestinationViewMBean proxy = (DestinationViewMBean) 
broker.getManagementContext().newProxyInstance(objectName, 
DestinationViewMBean.class, true);
+        return proxy.getQueueSize();
+    }
+
+    private void purgeQueue(final String queueName) throws 
MalformedObjectNameException, Exception {
+        ObjectName objectName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
 + JMXSupport.encodeObjectNamePart(queueName));
+        QueueViewMBean proxy = (QueueViewMBean) 
broker.getManagementContext().newProxyInstance(objectName, 
QueueViewMBean.class, true);
+        proxy.purge();
+    }
+
+    private String getDataDirectory() throws MalformedObjectNameException {
+        ObjectName objectName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
+        BrokerViewMBean proxy = (BrokerViewMBean) 
broker.getManagementContext().newProxyInstance(objectName, 
BrokerViewMBean.class, true);
+        return proxy.getDataDirectory();
+    }
+
+    protected static void curruptIndexFile(final String dataPath) throws 
FileNotFoundException, UnsupportedEncodingException {
+        PrintWriter writer = new 
PrintWriter(String.format("%s/kahadb/db.data", dataPath), "UTF-8");
+        writer.println("asdasdasd");
+        writer.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/cbe486fb/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7067/activemq.xml
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7067/activemq.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7067/activemq.xml
new file mode 100644
index 0000000..70da26b
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7067/activemq.xml
@@ -0,0 +1,111 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+        xmlns="http://www.springframework.org/schema/beans";
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>
+
+    <!--
+        The <broker> element is used to configure the ActiveMQ broker.
+    -->
+    <broker xmlns="http://activemq.apache.org/schema/core"; 
brokerName="localhost" dataDirectory="target/data">
+
+        <destinationPolicy>
+            <policyMap>
+                <policyEntries>
+                    <policyEntry topic=">" >
+                        <!-- The constantPendingMessageLimitStrategy is used 
to prevent
+                             slow topic consumers to block producers and 
affect other consumers
+                             by limiting the number of messages that are 
retained
+                             For more information, see:
+
+                             
http://activemq.apache.org/slow-consumer-handling.html
+
+                        -->
+                        <pendingMessageLimitStrategy>
+                            <constantPendingMessageLimitStrategy limit="1000"/>
+                        </pendingMessageLimitStrategy>
+                    </policyEntry>
+                </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+
+
+        <!--
+            The managementContext is used to configure how ActiveMQ is exposed 
in
+            JMX. By default, ActiveMQ uses the MBean server that is started by
+            the JVM. For more information, see:
+
+            http://activemq.apache.org/jmx.html
+        -->
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
+
+        <!--
+            Configure message persistence for the broker. The default 
persistence
+            mechanism is the KahaDB store (identified by the kahaDB tag).
+            For more information, see:
+
+            http://activemq.apache.org/persistence.html
+        -->
+        <persistenceAdapter>
+            <kahaDB directory="target/data/kahadb" />
+        </persistenceAdapter>
+
+
+        <!--
+          The systemUsage controls the maximum amount of space the broker will
+          use before disabling caching and/or slowing down producers. For more 
information, see:
+          http://activemq.apache.org/producer-flow-control.html
+        -->
+        <systemUsage>
+            <systemUsage>
+                <memoryUsage>
+                    <memoryUsage percentOfJvmHeap="70" />
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="100 gb"/>
+                </storeUsage>
+                <tempUsage>
+                    <tempUsage limit="50 gb"/>
+                </tempUsage>
+            </systemUsage>
+        </systemUsage>
+
+        <!--
+            The transport connectors expose ActiveMQ over a given protocol to
+            clients and other brokers. For more information, see:
+
+            http://activemq.apache.org/configuring-transports.html
+        -->
+        <transportConnectors>
+            <!-- DOS protection, limit concurrent connections to 1000 and 
frame size to 100MB -->
+            <transportConnector name="openwire" 
uri="tcp://localhost:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
+        </transportConnectors>
+
+        <!-- destroy the spring context on shutdown to stop jetty -->
+        <shutdownHooks>
+            <bean xmlns="http://www.springframework.org/schema/beans"; 
class="org.apache.activemq.hooks.SpringContextHook" />
+        </shutdownHooks>
+
+    </broker>
+
+</beans>
+        <!-- END SNIPPET: example -->
\ No newline at end of file

Reply via email to