Repository: activemq Updated Branches: refs/heads/master 6d4459a00 -> 612d4aeeb
AMQ-7118 This closes #327 - with thanks to Heath Kesler Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/612d4aee Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/612d4aee Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/612d4aee Branch: refs/heads/master Commit: 612d4aeeb4025d62f75a301a655112b6e53c7170 Parents: 6d4459a Author: jgoodyear <jamie.goody...@gmail.com> Authored: Tue Dec 4 12:24:24 2018 -0330 Committer: jgoodyear <jamie.goody...@gmail.com> Committed: Tue Dec 4 12:54:30 2018 -0330 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 13 +- .../org/apache/activemq/bugs/AMQ7118Test.java | 228 +++++++++++++++++++ .../apache/activemq/bugs/amq7118/activemq.xml | 109 +++++++++ 3 files changed, 349 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/612d4aee/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 21027c6..c7ca4fe 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -148,6 +148,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected Location ackMessageFileMapLocation = null; protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<>(); + protected transient AtomicBoolean ackMessageFileMapDirtyFlag = new AtomicBoolean(false); protected int version = VERSION; protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; @@ -825,6 +826,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); + metadata.ackMessageFileMapDirtyFlag.lazySet(true); requiresReplay = false; } catch (Exception e) { LOG.warn("Cannot recover ackMessageFileMap", e); @@ -1631,6 +1633,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe referenceFileIds = new HashSet<>(); referenceFileIds.add(messageLocation.getDataFileId()); metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); + metadata.ackMessageFileMapDirtyFlag.lazySet(true); + } else { Integer id = Integer.valueOf(messageLocation.getDataFileId()); if (!referenceFileIds.contains(id)) { @@ -1757,7 +1761,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.state = OPEN_STATE; metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); - metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); + if (metadata.ackMessageFileMapDirtyFlag.get() || (metadata.ackMessageFileMapLocation == null)) { + metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); + } + metadata.ackMessageFileMapDirtyFlag.lazySet(false); Location[] inProgressTxRange = getInProgressTxLocationRange(); metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; tx.store(metadata.page, metadataMarshaller, true); @@ -1928,6 +1935,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } if (gcCandidateSet.contains(candidate)) { ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null); + metadata.ackMessageFileMapDirtyFlag.lazySet(true); } else { if (LOG.isTraceEnabled()) { LOG.trace("not removing data file: " + candidate @@ -1942,6 +1950,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe for (Integer candidate : gcCandidateSet) { for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) { ackMessageFileMapMod |= ackFiles.remove(candidate); + metadata.ackMessageFileMapDirtyFlag.lazySet(true); } } if (ackMessageFileMapMod) { @@ -2146,6 +2155,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe referenceFileIds = new HashSet<>(); referenceFileIds.addAll(entry.getValue()); metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); + metadata.ackMessageFileMapDirtyFlag.lazySet(true); } else { referenceFileIds.addAll(entry.getValue()); } @@ -2154,6 +2164,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // remove the old location data from the ack map so that the old journal log file can // be removed on next GC. metadata.ackMessageFileMap.remove(journalToRead); + metadata.ackMessageFileMapDirtyFlag.lazySet(true); indexLock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/activemq/blob/612d4aee/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java new file mode 100644 index 0000000..0e845d2 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.lang.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.io.File; +import java.io.FilenameFilter; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class AMQ7118Test { + + protected static final Logger LOG = LoggerFactory.getLogger(AMQ7118Test.class); + + protected static Random r = new Random(); + final static String WIRE_LEVEL_ENDPOINT = "tcp://localhost:61616"; + protected BrokerService broker; + protected Connection producerConnection; + protected Session pSession; + protected Connection cConnection; + protected Session cSession; + private final String xbean = "xbean:"; + private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq7118"; + int checkpointIndex = 0; + + private static final ActiveMQConnectionFactory ACTIVE_MQ_CONNECTION_FACTORY = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT); + + @Before + public void setup() throws Exception { + deleteData(new File("target/data")); + createBroker(); + } + + @After + public void shutdown() throws Exception { + broker.stop(); + } + + public void setupProducerConnection() throws Exception { + producerConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection(); + producerConnection.start(); + pSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void setupConsumerConnection() throws Exception { + cConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection(); + cConnection.setClientID("myClient1"); + cConnection.start(); + cSession = cConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + private void createBroker() throws Exception { + broker = new BrokerService(); + broker = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml"); + broker.start(); + } + + + @Test + public void testCompaction() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + setupProducerConnection(); + setupConsumerConnection(); + + Topic topic = pSession.createTopic("test"); + + MessageConsumer consumer = cSession.createDurableSubscriber(topic, "clientId1"); + + LOG.info("Produce message to test topic"); + produce(pSession, topic, 1, 512 ); // just one message + + LOG.info("Consume message from test topic"); + Message msg = consumer.receive(5000); + assertNotNull(msg); + + LOG.info("Produce more messages to test topic and get into PFC"); + boolean sent = produce(cSession, topic, 20, 512 * 1024); // Fill the store + + assertFalse("Never got to PFC condition", sent); + + LOG.info("PFC hit"); + + //We hit PFC, so shut down the producer + producerConnection.close(); + + //Lets check the db-*.log file count before checkpointUpdate + checkFiles(false, 21, "db-21.log"); + + // Force checkFiles update + checkFiles(true, 23, "db-23.log"); + + //The ackMessageFileMap should be clean, so no more writing + checkFiles(true, 23, "db-23.log"); + + //One more time just to be sure - The ackMessageFileMap should be clean, so no more writing + checkFiles(true, 23, "db-23.log"); + + //Read out the rest of the messages + LOG.info("Consuming the rest of the files..."); + for (int i = 0; i < 20; i++) { + msg = consumer.receive(5000); + } + LOG.info("All messages Consumed."); + + //Clean up the log files and be sure its stable + checkFiles(true, 2, "db-33.log"); + checkFiles(true, 3, "db-34.log"); + checkFiles(true, 2, "db-34.log"); + checkFiles(true, 2, "db-34.log"); + checkFiles(true, 2, "db-34.log"); + + broker.stop(); + broker.waitUntilStopped(); + } + + protected static boolean produce(Session session, Topic topic, int messageCount, int messageSize) throws JMSException { + MessageProducer producer = session.createProducer(topic); + + for (int i = 0; i < messageCount; i++) { + TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", messageSize)); + + try { + producer.send(helloMessage); + } catch (ResourceAllocationException e){ + return false; + } + } + + return true; + } + + private void deleteData(File file) { + String[] entries = file.list(); + if (entries == null) return; + for (String s : entries) { + File currentFile = new File(file.getPath(), s); + if (currentFile.isDirectory()) { + deleteData(currentFile); + } + currentFile.delete(); + } + file.delete(); + } + + private void checkFiles(boolean doCheckpoint, int expectedCount, String lastFileName) throws Exception { + + File dbfiles = new File("target/data/kahadb"); + FilenameFilter lff = new FilenameFilter(){ + @Override + public boolean accept(File dir, String name) { + return name.toLowerCase().startsWith("db-") && name.toLowerCase().endsWith("log"); + } + }; + + if(doCheckpoint) { + LOG.info("Initiating checkpointUpdate "+ ++checkpointIndex + " ..."); + broker.getPersistenceAdapter().checkpoint(true); + TimeUnit.SECONDS.sleep(2); + LOG.info("Checkpoint complete."); + } + File files[] = dbfiles.listFiles(lff); + Arrays.sort(files, new DBFileComparator() ); + logfiles(files); + assertEquals(expectedCount, files.length); + assertEquals(lastFileName, files[files.length-1].getName()); + + } + + private void logfiles(File[] files){ + + LOG.info("Files found in KahaDB:"); + for (File file : files) { + LOG.info(" " + file.getName()); + } + } + + private class DBFileComparator implements Comparator<File> { + @Override + public int compare(File o1, File o2) { + int n1 = extractNumber(o1.getName()); + int n2 = extractNumber(o2.getName()); + return n1 - n2; + } + + private int extractNumber(String name) { + int i = 0; + try { + int s = name.indexOf('-')+1; + int e = name.lastIndexOf('.'); + String number = name.substring(s, e); + i = Integer.parseInt(number); + } catch(Exception e) { + i = 0; // if filename does not match the format + // then default to 0 + } + return i; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/612d4aee/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7118/activemq.xml ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7118/activemq.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7118/activemq.xml new file mode 100644 index 0000000..b610ae0 --- /dev/null +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7118/activemq.xml @@ -0,0 +1,109 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- START SNIPPET: example --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <!-- + The <broker> element is used to configure the ActiveMQ broker. + --> + <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/data"> + + <destinationPolicy> + <policyMap> + <policyEntries> + <policyEntry topic=">" > + <!-- The constantPendingMessageLimitStrategy is used to prevent + slow topic consumers to block producers and affect other consumers + by limiting the number of messages that are retained + For more information, see: + + http://activemq.apache.org/slow-consumer-handling.html + + --> + <pendingMessageLimitStrategy> + <constantPendingMessageLimitStrategy limit="1000"/> + </pendingMessageLimitStrategy> + </policyEntry> + </policyEntries> + </policyMap> + </destinationPolicy> + + + <!-- + The managementContext is used to configure how ActiveMQ is exposed in + JMX. By default, ActiveMQ uses the MBean server that is started by + the JVM. For more information, see: + + http://activemq.apache.org/jmx.html + --> + <managementContext> + <managementContext createConnector="false"/> + </managementContext> + + <!-- + Configure message persistence for the broker. The default persistence + mechanism is the KahaDB store (identified by the kahaDB tag). + For more information, see: + + http://activemq.apache.org/persistence.html + --> + <persistenceAdapter> + <kahaDB directory="target/data/kahadb" journalMaxFileLength="1k"/> + </persistenceAdapter> + + + <!-- + The systemUsage controls the maximum amount of space the broker will + use before disabling caching and/or slowing down producers. For more information, see: + http://activemq.apache.org/producer-flow-control.html + --> + <systemUsage> + <systemUsage sendFailIfNoSpace="true"> + <memoryUsage> + <memoryUsage percentOfJvmHeap="70" /> + </memoryUsage> + <storeUsage> + <storeUsage limit="10 mb" total="10000000"/> + </storeUsage> + </systemUsage> + </systemUsage> + + <!-- + The transport connectors expose ActiveMQ over a given protocol to + clients and other brokers. For more information, see: + + http://activemq.apache.org/configuring-transports.html + --> + <transportConnectors> + <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> + <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> + </transportConnectors> + + <!-- destroy the spring context on shutdown to stop jetty --> + <shutdownHooks> + <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> + </shutdownHooks> + + </broker> + +</beans> +<!-- END SNIPPET: example --> +