ARTEMIS-319 Improving files allocation and implementing journal-pool-files https://issues.apache.org/jira/browse/ARTEMIS-319
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/351bcfc9 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/351bcfc9 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/351bcfc9 Branch: refs/heads/master Commit: 351bcfc9f98e353e4e2e41974969127e29ecef94 Parents: 0b22433 Author: Clebert Suconic <[email protected]> Authored: Tue Nov 24 19:02:42 2015 -0500 Committer: Clebert Suconic <[email protected]> Committed: Thu Dec 10 16:49:58 2015 -0500 ---------------------------------------------------------------------- .../cli/commands/tools/CompactJournal.java | 2 +- .../cli/commands/tools/DecodeJournal.java | 2 +- .../cli/commands/tools/EncodeJournal.java | 2 +- .../cli/commands/tools/XmlDataExporter.java | 2 +- .../artemis/cli/commands/etc/broker.xml | 4 +- .../config/ActiveMQDefaultConfiguration.java | 9 + .../journal/JMSJournalStorageManagerImpl.java | 2 +- .../artemis/core/io/aio/AIOSequentialFile.java | 7 +- .../journal/impl/JournalFilesRepository.java | 10 +- .../artemis/core/journal/impl/JournalImpl.java | 137 +++--- artemis-native/bin/libartemis-native-32.so | Bin 26721 -> 26789 bytes artemis-native/bin/libartemis-native-64.so | Bin 23497 -> 25003 bytes ...che_activemq_artemis_jlibaio_LibaioContext.c | 44 +- .../activemq/artemis/jlibaio/LibaioContext.java | 2 +- .../activemq/artemis/jlibaio/LibaioFile.java | 3 - .../artemis/jlibaio/test/LibaioTest.java | 33 +- .../artemis/core/config/Configuration.java | 7 + .../core/config/impl/ConfigurationImpl.java | 14 + .../deployers/impl/FileConfigurationParser.java | 2 + .../impl/journal/DescribeJournal.java | 4 +- .../impl/journal/JournalStorageManager.java | 4 +- .../resources/schema/artemis-configuration.xsd | 8 + .../artemis/tests/util/ActiveMQTestBase.java | 8 +- docs/user-manual/en/configuration-index.md | 1 + docs/user-manual/en/persistence.md | 12 + docs/user-manual/en/tools.md | 157 +++++-- .../integration/client/ConsumerStuckTest.java | 4 +- .../integration/client/HangConsumerTest.java | 2 +- .../integration/client/JournalCrashTest.java | 2 +- .../tests/integration/client/PagingTest.java | 2 +- .../client/RedeliveryConsumerTest.java | 2 +- .../integration/cluster/bridge/BridgeTest.java | 2 +- .../journal/NIOJournalCompactTest.java | 4 +- .../journal/ValidateTransactionHealthTest.java | 2 +- .../journal/JournalImplTestUnit.java | 6 +- .../storage/PersistMultiThreadTest.java | 440 +++++++++++++++++++ .../storage/SendReceiveMultiThreadTest.java | 297 +++++++++++++ .../stress/journal/AddAndRemoveStressTest.java | 12 +- .../JournalCleanupCompactStressTest.java | 2 +- .../stress/journal/MixupCompactorTestBase.java | 2 +- .../NIOMultiThreadCompactorStressTest.java | 2 +- .../journal/impl/AlignedJournalImplTest.java | 10 +- .../core/journal/impl/JournalAsyncTest.java | 2 +- .../core/journal/impl/JournalImplTestBase.java | 14 +- .../core/journal/impl/JournalImplTestUnit.java | 109 ++++- .../impl/BatchIDGeneratorUnitTest.java | 2 +- 46 files changed, 1197 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java index eb4a352..a638963 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java @@ -53,7 +53,7 @@ public final class CompactJournal extends LockAbstract { final IOCriticalErrorListener listener) throws Exception { NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1); - JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); journal.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java index f2c20d9..e39b055 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java @@ -107,7 +107,7 @@ public class DecodeJournal extends LockAbstract { NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1); - JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); if (journal.orderFiles().size() != 0) { throw new IllegalStateException("Import needs to create a brand new journal"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java index 38e599b..bc0d39e 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java @@ -101,7 +101,7 @@ public class EncodeJournal extends LockAbstract { final PrintStream out) throws Exception { NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1); - JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); List<JournalFile> files = journal.orderFiles(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java index d0221f0..8994c72 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java @@ -310,7 +310,7 @@ public final class XmlDataExporter extends LockAbstract { private void getJmsBindings() throws Exception { SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); - Journal jmsJournal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1); + Journal jmsJournal = new JournalImpl(1024 * 1024, 2, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1); jmsJournal.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index ed1fbd2..49845f7 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -43,7 +43,9 @@ under the License. <large-messages-directory>${data.dir}/large-messages</large-messages-directory> - <journal-min-files>10</journal-min-files> + <journal-min-files>2</journal-min-files> + + <journal-pool-files>-1</journal-pool-files> ${journal-buffer.settings} ${connector-config.settings} <acceptors> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 7b27dcf..1322100 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -99,6 +99,7 @@ public final class ActiveMQDefaultConfiguration { // These defaults are applied depending on whether the journal type // is NIO or AIO. private static int DEFAULT_JOURNAL_MAX_IO_AIO = 500; + private static int DEFAULT_JOURNAL_POOL_FILES = -1; private static int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO; private static int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO; private static int DEFAULT_JOURNAL_MAX_IO_NIO = 1; @@ -680,6 +681,14 @@ public final class ActiveMQDefaultConfiguration { } /** + * How many journal files can be resued + * @return + */ + public static int getDefaultJournalPoolFiles() { + return DEFAULT_JOURNAL_POOL_FILES; + } + + /** * The percentage of live data on which we consider compacting the journal */ public static int getDefaultJournalCompactPercentage() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java index 55bc821..f47f1bb 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java @@ -86,7 +86,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager { SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); - Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1); + Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1); if (replicator != null) { jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 9bac49d..efeeb2e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -79,9 +79,10 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public int getAlignment() { - checkOpened(); - - return aioFile.getBlockSize(); + // TODO: get the alignment from the file system, but we have to cache this, we can't call it every time + /* checkOpened(); + return aioFile.getBlockSize(); */ + return 512; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index e1d69ab..a46c0b6 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -73,6 +73,8 @@ public class JournalFilesRepository { private final int minFiles; + private final int poolSize; + private final int fileSize; private final String filePrefix; @@ -104,7 +106,8 @@ public class JournalFilesRepository { final int userVersion, final int maxAIO, final int fileSize, - final int minFiles) { + final int minFiles, + final int poolSize) { if (filePrefix == null) { throw new IllegalArgumentException("filePrefix cannot be null"); } @@ -120,6 +123,7 @@ public class JournalFilesRepository { this.fileExtension = fileExtension; this.minFiles = minFiles; this.fileSize = fileSize; + this.poolSize = poolSize; this.userVersion = userVersion; this.journal = journal; } @@ -358,7 +362,7 @@ public class JournalFilesRepository { ActiveMQJournalLogger.LOGGER.deletingFile(file); file.getFile().delete(); } - else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < minFiles)) { + else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < poolSize) || (poolSize < 0)) { // Re-initialise it if (JournalFilesRepository.trace) { @@ -378,7 +382,7 @@ public class JournalFilesRepository { if (trace) { ActiveMQJournalLogger.LOGGER.trace("DataFiles.size() = " + dataFiles.size()); ActiveMQJournalLogger.LOGGER.trace("openedFiles.size() = " + openedFiles.size()); - ActiveMQJournalLogger.LOGGER.trace("minfiles = " + minFiles); + ActiveMQJournalLogger.LOGGER.trace("minfiles = " + minFiles + ", poolSize = " + poolSize); ActiveMQJournalLogger.LOGGER.trace("Free Files = " + freeFilesCount.get()); ActiveMQJournalLogger.LOGGER.trace("File " + file + " being deleted as freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() (" + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 4b72992..14b6d92 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -40,21 +40,20 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.io.SequentialFile; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TestableJournal; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; @@ -193,7 +192,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // Lock used during the append of records // This lock doesn't represent a global lock. // After a record is appended, the usedFile can't be changed until the positives and negatives are updated - private final ReentrantLock lockAppend = new ReentrantLock(); + private final Object lockAppend = new Object(); /** * We don't lock the journal during the whole compacting operation. During compacting we only @@ -209,23 +208,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private volatile JournalState state = JournalState.STOPPED; + private volatile int compactCount = 0; + private final Reclaimer reclaimer = new Reclaimer(); // Constructors -------------------------------------------------- public JournalImpl(final int fileSize, final int minFiles, + final int poolSize, final int compactMinFiles, final int compactPercentage, final SequentialFileFactory fileFactory, final String filePrefix, final String fileExtension, final int maxAIO) { - this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0); + this(fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0); } public JournalImpl(final int fileSize, final int minFiles, + final int poolSize, final int compactMinFiles, final int compactPercentage, final SequentialFileFactory fileFactory, @@ -234,6 +237,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final int maxAIO, final int userVersion) { super(fileFactory.isSupportsCallbacks(), fileSize); + if (fileSize % fileFactory.getAlignment() != 0) { throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " + fileFactory.getAlignment()); @@ -257,7 +261,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal this.fileFactory = fileFactory; - filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles); + filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize); this.userVersion = userVersion; } @@ -715,8 +719,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal callback.storeLineUp(); } - lockAppend.lock(); - try { + synchronized (lockAppend) { JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback); if (JournalImpl.TRACE_RECORDS) { @@ -729,9 +732,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize())); } - finally { - lockAppend.unlock(); - } } finally { journalLock.readLock().unlock(); @@ -763,8 +763,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal callback.storeLineUp(); } - lockAppend.lock(); - try { + synchronized (lockAppend) { JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback); if (JournalImpl.TRACE_RECORDS) { @@ -784,9 +783,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); } } - finally { - lockAppend.unlock(); - } } finally { journalLock.readLock().unlock(); @@ -821,8 +817,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal callback.storeLineUp(); } - lockAppend.lock(); - try { + synchronized (lockAppend) { JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback); if (JournalImpl.TRACE_RECORDS) { @@ -839,9 +834,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } - finally { - lockAppend.unlock(); - } } finally { journalLock.readLock().unlock(); @@ -862,8 +854,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal JournalTransaction tx = getTransactionInfo(txID); - lockAppend.lock(); - try { + synchronized (lockAppend) { JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); if (JournalImpl.TRACE_RECORDS) { @@ -878,9 +869,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.addPositive(usedFile, id, addRecord.getEncodeSize()); } - finally { - lockAppend.unlock(); - } } finally { journalLock.readLock().unlock(); @@ -911,8 +899,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal JournalTransaction tx = getTransactionInfo(txID); - lockAppend.lock(); - try { + synchronized (lockAppend) { JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null); if (JournalImpl.TRACE_RECORDS) { @@ -927,9 +914,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize()); } - finally { - lockAppend.unlock(); - } } finally { journalLock.readLock().unlock(); @@ -949,8 +933,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal JournalTransaction tx = getTransactionInfo(txID); - lockAppend.lock(); - try { + synchronized (lockAppend) { JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null); if (JournalImpl.TRACE_RECORDS) { @@ -963,9 +946,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.addNegative(usedFile, id); } - finally { - lockAppend.unlock(); - } } finally { journalLock.readLock().unlock(); @@ -1003,8 +983,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal callback.storeLineUp(); } - lockAppend.lock(); - try { + synchronized (lockAppend) { JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback); if (JournalImpl.TRACE_RECORDS) { @@ -1013,9 +992,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.prepare(usedFile); } - finally { - lockAppend.unlock(); - } } finally { @@ -1053,8 +1029,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal callback.storeLineUp(); } - lockAppend.lock(); - try { + synchronized (lockAppend) { JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback); if (JournalImpl.TRACE_RECORDS) { @@ -1063,9 +1038,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.commit(usedFile); } - finally { - lockAppend.unlock(); - } } finally { @@ -1094,15 +1066,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal callback.storeLineUp(); } - lockAppend.lock(); - try { + synchronized (lockAppend) { JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback); tx.rollback(usedFile); } - finally { - lockAppend.unlock(); - } } finally { @@ -1289,11 +1257,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal * Note: only synchronized methods on journal are methods responsible for the life-cycle such as * stop, start records will still come as this is being executed */ + public synchronized void compact() throws Exception { + if (compactor != null) { throw new IllegalStateException("There is pending compacting operation"); } + if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) { + ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact compacting journal " + (++compactCount)); + } + compactorLock.writeLock().lock(); try { ArrayList<JournalFile> dataFilesToProcess = new ArrayList<>(filesRepository.getDataFilesCount()); @@ -2067,14 +2041,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void forceMoveNextFile() throws Exception { journalLock.readLock().lock(); try { - lockAppend.lock(); - try { + synchronized (lockAppend) { moveNextFile(false); debugWait(); } - finally { - lockAppend.unlock(); - } } finally { journalLock.readLock().unlock(); @@ -2131,9 +2101,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal journalLock.writeLock().lock(); try { - lockAppend.lock(); - - try { + synchronized (lockAppend) { setJournalState(JournalState.STOPPED); @@ -2172,9 +2140,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal currentFile = null; } - finally { - lockAppend.unlock(); - } } finally { journalLock.writeLock().unlock(); @@ -2666,33 +2631,31 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal @Override public void run() { - lockAppend.lock(); - try { + synchronized (lockAppend) { + try { - final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]); + final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]); - JournalInternalRecord blastRecord = new JournalInternalRecord() { + JournalInternalRecord blastRecord = new JournalInternalRecord() { - @Override - public int getEncodeSize() { - return byteEncoder.getEncodeSize(); - } + @Override + public int getEncodeSize() { + return byteEncoder.getEncodeSize(); + } - @Override - public void encode(final ActiveMQBuffer buffer) { - byteEncoder.encode(buffer); - } - }; + @Override + public void encode(final ActiveMQBuffer buffer) { + byteEncoder.encode(buffer); + } + }; - for (int i = 0; i < pages; i++) { - appendRecord(blastRecord, false, false, null, null); + for (int i = 0; i < pages; i++) { + appendRecord(blastRecord, false, false, null, null); + } + } + catch (Exception e) { + ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e); } - } - catch (Exception e) { - ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e); - } - finally { - lockAppend.unlock(); } } } @@ -2863,4 +2826,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal throw new RuntimeException(e); } } + + /** + * For tests only + */ + public int getCompactCount() { + return compactCount; + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-native/bin/libartemis-native-32.so ---------------------------------------------------------------------- diff --git a/artemis-native/bin/libartemis-native-32.so b/artemis-native/bin/libartemis-native-32.so index 540da4b..7a7f451 100755 Binary files a/artemis-native/bin/libartemis-native-32.so and b/artemis-native/bin/libartemis-native-32.so differ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-native/bin/libartemis-native-64.so ---------------------------------------------------------------------- diff --git a/artemis-native/bin/libartemis-native-64.so b/artemis-native/bin/libartemis-native-64.so old mode 100755 new mode 100644 index 33b9b1c..a938c70 Binary files a/artemis-native/bin/libartemis-native-64.so and b/artemis-native/bin/libartemis-native-64.so differ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c index 1e08041..0d09ed1 100644 --- a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c +++ b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c @@ -63,6 +63,9 @@ struct io_control { int dumbWriteHandler = 0; char dumbPath[PATH_MAX]; +#define ONE_MEGA 1048576l +void * oneMegaBuffer = 0; + jclass submitClass = NULL; jmethodID errorMethod = NULL; @@ -121,6 +124,12 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) { return JNI_ERR; } else { + if (posix_memalign(&oneMegaBuffer, 512, ONE_MEGA) != 0) + { + fprintf(stderr, "Could not allocate the 1 Mega Buffer for initializing files\n"); + return JNI_ERR; + } + memset(oneMegaBuffer, 0, ONE_MEGA); sprintf (dumbPath, "%s/artemisJLHandler_XXXXXX", P_tmpdir); dumbWriteHandler = mkstemp (dumbPath); @@ -219,6 +228,8 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { } else { closeDumbHandlers(); + free(oneMegaBuffer); + // delete global references so the GC can collect them if (runtimeExceptionClass != NULL) { (*env)->DeleteGlobalRef(env, runtimeExceptionClass); @@ -757,17 +768,34 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fa JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fill (JNIEnv * env, jclass clazz, jint fd, jlong size) { - void * preAllocBuffer = 0; - if (posix_memalign(&preAllocBuffer, 512, size) != 0) + + int i; + int blocks = size / ONE_MEGA; + int rest = size % ONE_MEGA; + + #ifdef DEBUG + fprintf (stderr, "blocks = %d, rest=%d\n", blocks, rest); + #endif + + lseek (fd, 0, SEEK_SET); + for (i = 0; i < blocks; i++) { - throwOutOfMemoryError(env); - return; + if (write(fd, oneMegaBuffer, ONE_MEGA) < 0) + { + throwIOException(env, "Cannot initialize file"); + return; + } + } + + if (rest != 0l) + { + if (write(fd, oneMegaBuffer, rest) < 0) + { + throwIOException(env, "Cannot initialize file"); + return; + } } - memset(preAllocBuffer, 0, size); - lseek (fd, 0, SEEK_SET); - write(fd, preAllocBuffer, size); lseek (fd, 0, SEEK_SET); - free (preAllocBuffer); } JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_memsetBuffer http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java index 9eb675d..f7cc0d3 100644 --- a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java @@ -49,7 +49,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable { * <br> * Or else the native module won't be loaded because of version mismatches */ - private static final int EXPECTED_NATIVE_VERSION = 3; + private static final int EXPECTED_NATIVE_VERSION = 5; private static boolean loaded = false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java index 27243fe..b1520d0 100644 --- a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java @@ -48,9 +48,6 @@ public final class LibaioFile<Callback extends SubmitInfo> { return LibaioContext.lock(fd); } - /** - * {@inheritDoc} - */ public void close() throws IOException { open = false; LibaioContext.close(fd); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java index af25dee..617553c 100644 --- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java +++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java @@ -93,12 +93,21 @@ public class LibaioTest { } @Test - public void testInitAndFallocate() throws Exception { + public void testInitAndFallocate10M() throws Exception { + testInit(10 * 1024 * 1024); + } + + @Test + public void testInitAndFallocate10M100K() throws Exception { + testInit(10 * 1024 * 1024 + 100 * 1024); + } + + private void testInit(int size) throws IOException { LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true); - fileDescriptor.fallocate(1024 * 1024); + fileDescriptor.fallocate(size); - ByteBuffer buffer = fileDescriptor.newBuffer(1024 * 1024); - fileDescriptor.read(0, 1024 * 1024, buffer, new TestInfo()); + ByteBuffer buffer = fileDescriptor.newBuffer(size); + fileDescriptor.read(0, size, buffer, new TestInfo()); TestInfo[] callbacks = new TestInfo[1]; control.poll(callbacks, 1, 1); @@ -108,11 +117,11 @@ public class LibaioTest { buffer.position(0); LibaioFile fileDescriptor2 = control.openFile(temporaryFolder.newFile("test2.bin"), true); - fileDescriptor2.fill(1024 * 1024); - fileDescriptor2.read(0, 1024 * 1024, buffer, new TestInfo()); + fileDescriptor2.fill(size); + fileDescriptor2.read(0, size, buffer, new TestInfo()); control.poll(callbacks, 1, 1); - for (int i = 0; i < 1024 * 1024; i++) { + for (int i = 0; i < size; i++) { Assert.assertEquals(0, buffer.get()); } @@ -120,6 +129,16 @@ public class LibaioTest { } @Test + public void testInitAndFallocate10K() throws Exception { + testInit(10 * 1024); + } + + @Test + public void testInitAndFallocate20K() throws Exception { + testInit(20 * 1024); + } + + @Test public void testSubmitWriteOnTwoFiles() throws Exception { File file1 = temporaryFolder.newFile("test.bin"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 8d5832f..6dfd2b8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -546,6 +546,13 @@ public interface Configuration { */ Configuration setJournalCompactMinFiles(int minFiles); + /** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/ + int getJournalPoolFiles(); + + /** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/ + Configuration setJournalPoolFiles(int poolSize); + + /** * Returns the percentage of live data before compacting the journal. <br> * Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_COMPACT_PERCENTAGE}. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index fad446f..e4b9f72 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -151,6 +151,8 @@ public class ConfigurationImpl implements Configuration, Serializable { protected int journalFileSize = ActiveMQDefaultConfiguration.getDefaultJournalFileSize(); + protected int journalPoolFiles = ActiveMQDefaultConfiguration.getDefaultJournalPoolFiles(); + protected int journalMinFiles = ActiveMQDefaultConfiguration.getDefaultJournalMinFiles(); // AIO and NIO need different values for these attributes @@ -670,6 +672,18 @@ public class ConfigurationImpl implements Configuration, Serializable { } @Override + public int getJournalPoolFiles() { + return journalPoolFiles; + } + + @Override + public Configuration setJournalPoolFiles(int poolSize) { + this.journalPoolFiles = poolSize; + return this; + } + + + @Override public int getJournalMinFiles() { return journalMinFiles; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index be1d6dd..caee9bf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -470,6 +470,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setJournalMinFiles(getInteger(e, "journal-min-files", config.getJournalMinFiles(), Validators.GT_ZERO)); + config.setJournalPoolFiles(getInteger(e, "journal-pool-files", config.getJournalPoolFiles(), Validators.MINUS_ONE_OR_GT_ZERO)); + config.setJournalCompactMinFiles(getInteger(e, "journal-compact-min-files", config.getJournalCompactMinFiles(), Validators.GE_ZERO)); config.setJournalCompactPercentage(getInteger(e, "journal-compact-percentage", config.getJournalCompactPercentage(), Validators.PERCENTAGE)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 6d96143..d41f0ed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -106,7 +106,7 @@ public final class DescribeJournal { SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null, 1); - JournalImpl bindings = new JournalImpl(1024 * 1024, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1); + JournalImpl bindings = new JournalImpl(1024 * 1024, 2, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1); describeJournal(bindingsFF, bindings, bindingsDir); } @@ -117,7 +117,7 @@ public final class DescribeJournal { // Will use only default values. The load function should adapt to anything different ConfigurationImpl defaultValues = new ConfigurationImpl(); - JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(), defaultValues.getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); + JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(), defaultValues.getJournalMinFiles(), defaultValues.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); return describeJournal(messagesFF, messagesJournal, messagesDir); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index c1f644f..fbf9655 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -231,7 +231,7 @@ public class JournalStorageManager implements StorageManager { SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO()); - Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1); + Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1); bindingsJournal = localBindings; originalBindingsJournal = localBindings; @@ -255,7 +255,7 @@ public class JournalStorageManager implements StorageManager { idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, this); - Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO()); + Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO()); messageJournal = localMessage; originalMessageJournal = localMessage; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index a76395c..0524348 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -593,6 +593,14 @@ </xsd:annotation> </xsd:element> + <xsd:element name="journal-pool-files" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + how many journal files to pre-create + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="journal-compact-percentage" type="xsd:int" default="30" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 34f1ca9..a75b73c 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -1630,7 +1630,7 @@ public abstract class ActiveMQTestBase extends Assert { try { SequentialFileFactory messagesFF = new NIOSequentialFileFactory(new File(getJournalDir()), null, 1); - messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); + messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); final List<RecordInfo> committedRecords = new LinkedList<>(); final List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>(); @@ -1664,7 +1664,7 @@ public abstract class ActiveMQTestBase extends Assert { final HashMap<Integer, AtomicInteger> recordsType = new HashMap<>(); SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getJournalLocation(), null, 1); - JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); + JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); List<JournalFile> filesToRead = messagesJournal.orderFiles(); for (JournalFile file : filesToRead) { @@ -1701,11 +1701,11 @@ public abstract class ActiveMQTestBase extends Assert { if (messageJournal) { ff = new NIOSequentialFileFactory(config.getJournalLocation(), null, 1); - journal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, 0, ff, "activemq-data", "amq", 1); + journal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, ff, "activemq-data", "amq", 1); } else { ff = new NIOSequentialFileFactory(config.getBindingsLocation(), null, 1); - journal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), ff, "activemq-bindings", "bindings", 1); + journal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), ff, "activemq-bindings", "bindings", 1); } journal.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/docs/user-manual/en/configuration-index.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index 72b4b37..d819d13 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -54,6 +54,7 @@ Name | Description [journal-file-size](persistence.md) | the size (in bytes) of each journal file. Default=10485760 (10 MB) [journal-max-io](persistence.md#configuring.message.journal.journal-max-io) | the maximum number of write requests that can be in the AIO queue at any one time. Default is 500 for AIO and 1 for NIO. [journal-min-files](persistence.md#configuring.message.journal.journal-min-files) | how many journal files to pre-create. Default=2 +[journal-pool-files](persistence.md#configuring.message.journal.journal-pool-files) | -1 (default) means no Limit. The system will create as many files as needed however when reclaiming files it will shrink back to the `journal-pool-files` [journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true [journal-sync-transactional](persistence.md) | if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true [journal-type](persistence.md) | the type of journal to use. Default=ASYNCIO http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/docs/user-manual/en/persistence.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index a9269b1..ccf14ec 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -218,6 +218,18 @@ The message journal is configured using the following attributes in steady state you should tune this number of files to match that total amount of data. +- `journal-pool-files` + + The system will create as many files as needed however when reclaiming files + it will shrink back to the `journal-pool-files`. + + The default to this parameter is -1, meaning it will never delete files on the journal once created. + + Notice that the system can't grow infinitely as you are still required to use paging for destinations that can + grow indefinitely. + + Notice: in case you get too many files you can use [compacting](tools.md). + - `journal-max-io` Write requests are queued up before being submitted to the system http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/docs/user-manual/en/tools.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/tools.md b/docs/user-manual/en/tools.md index 218f6a7..b36e132 100644 --- a/docs/user-manual/en/tools.md +++ b/docs/user-manual/en/tools.md @@ -47,24 +47,31 @@ OPTIONS For a full list of data tools commands available use: ``` -$ ./artemis help data NAME - artemis data - data tools like (print|exp|imp|exp|encode|decode) - (example ./artemis data print) + artemis data - data tools group + (print|exp|imp|exp|encode|decode|compact) (example ./artemis data print) SYNOPSIS artemis data - artemis data decode [--prefix <prefix>] [--directory <directory>] - [--suffix <suffix>] [--file-size <size>] - artemis data encode [--prefix <prefix>] [--directory <directory>] - [--suffix <suffix>] [--file-size <size>] - artemis data exp [--bindings <binding>] - [--large-messages <largeMessges>] [--paging <paging>] - [--journal <journal>] - artemis data imp [--password <password>] [--port <port>] [--host <host>] - [--user <user>] [--transaction] - artemis data print [--bindings <binding>] [--paging <paging>] - [--journal <journal>] + artemis data compact [--broker <brokerConfig>] [--verbose] + [--paging <paging>] [--journal <journal>] + [--large-messages <largeMessges>] [--bindings <binding>] + artemis data decode [--broker <brokerConfig>] [--suffix <suffix>] + [--verbose] [--paging <paging>] [--prefix <prefix>] [--file-size <size>] + [--directory <directory>] --input <input> [--journal <journal>] + [--large-messages <largeMessges>] [--bindings <binding>] + artemis data encode [--directory <directory>] [--broker <brokerConfig>] + [--suffix <suffix>] [--verbose] [--paging <paging>] [--prefix <prefix>] + [--file-size <size>] [--journal <journal>] + [--large-messages <largeMessges>] [--bindings <binding>] + artemis data exp [--broker <brokerConfig>] [--verbose] + [--paging <paging>] [--journal <journal>] + [--large-messages <largeMessges>] [--bindings <binding>] + artemis data imp [--host <host>] [--verbose] [--port <port>] + [--password <password>] [--transaction] --input <input> [--user <user>] + artemis data print [--broker <brokerConfig>] [--verbose] + [--paging <paging>] [--journal <journal>] + [--large-messages <largeMessges>] [--bindings <binding>] COMMANDS With no arguments, Display help information @@ -73,73 +80,145 @@ COMMANDS Print data records information (WARNING: don't use while a production server is running) - With --bindings option, The folder used for bindings (default - ../data/bindings) + With --broker option, This would override the broker configuration + from the bootstrap - With --paging option, The folder used for paging (default - ../data/paging) + With --verbose option, Adds more information on the execution + + With --paging option, The folder used for paging (default from + broker.xml) With --journal option, The folder used for messages journal (default - ../data/journal) + from broker.xml) + + With --large-messages option, The folder used for large-messages + (default from broker.xml) + + With --bindings option, The folder used for bindings (default from + broker.xml) exp Export all message-data using an XML that could be interpreted by any system. - With --bindings option, The folder used for bindings (default - ../data/bindings) + With --broker option, This would override the broker configuration + from the bootstrap - With --large-messages option, The folder used for large-messages - (default ../data/largemessages) + With --verbose option, Adds more information on the execution - With --paging option, The folder used for paging (default - ../data/paging) + With --paging option, The folder used for paging (default from + broker.xml) With --journal option, The folder used for messages journal (default - ../data/journal) + from broker.xml) + + With --large-messages option, The folder used for large-messages + (default from broker.xml) + + With --bindings option, The folder used for bindings (default from + broker.xml) imp Import all message-data using an XML that could be interpreted by any system. - With --password option, User name used to import the data. (default - null) - - With --port option, The port used to import the data (default 61616) - With --host option, The host used to import the data (default localhost) - With --user option, User name used to import the data. (default + With --verbose option, Adds more information on the execution + + With --port option, The port used to import the data (default 61616) + + With --password option, User name used to import the data. (default null) With --transaction option, If this is set to true you will need a whole transaction to commit at the end. (default false) + With --input option, The input file name (default=exp.dmp) + + With --user option, User name used to import the data. (default + null) + decode Decode a journal's internal format into a new journal set of files - With --prefix option, The journal prefix (default activemq-datal) - - With --directory option, The journal folder (default - ../data/journal) + With --broker option, This would override the broker configuration + from the bootstrap With --suffix option, The journal suffix (default amq) + With --verbose option, Adds more information on the execution + + With --paging option, The folder used for paging (default from + broker.xml) + + With --prefix option, The journal prefix (default activemq-data) + With --file-size option, The journal size (default 10485760) + With --directory option, The journal folder (default journal folder + from broker.xml) + + With --input option, The input file name (default=exp.dmp) + + With --journal option, The folder used for messages journal (default + from broker.xml) + + With --large-messages option, The folder used for large-messages + (default from broker.xml) + + With --bindings option, The folder used for bindings (default from + broker.xml) + encode Encode a set of journal files into an internal encoded data format - With --prefix option, The journal prefix (default activemq-datal) + With --directory option, The journal folder (default the journal + folder from broker.xml) - With --directory option, The journal folder (default - ../data/journal) + With --broker option, This would override the broker configuration + from the bootstrap With --suffix option, The journal suffix (default amq) + With --verbose option, Adds more information on the execution + + With --paging option, The folder used for paging (default from + broker.xml) + + With --prefix option, The journal prefix (default activemq-data) + With --file-size option, The journal size (default 10485760) + With --journal option, The folder used for messages journal (default + from broker.xml) + + With --large-messages option, The folder used for large-messages + (default from broker.xml) + + With --bindings option, The folder used for bindings (default from + broker.xml) + + compact + Compacts the journal of a non running server + + With --broker option, This would override the broker configuration + from the bootstrap + + With --verbose option, Adds more information on the execution + + With --paging option, The folder used for paging (default from + broker.xml) + + With --journal option, The folder used for messages journal (default + from broker.xml) + + With --large-messages option, The folder used for large-messages + (default from broker.xml) + + With --bindings option, The folder used for bindings (default from + broker.xml) ``` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerStuckTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerStuckTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerStuckTest.java index 2b706c0..747d954 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerStuckTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerStuckTest.java @@ -54,7 +54,7 @@ public class ConsumerStuckTest extends ActiveMQTestBase { @Test public void testClientStuckTest() throws Exception { - ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024); + ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024).setCallTimeout(1000); ClientSessionFactory sf = locator.createSessionFactory(); ((ClientSessionFactoryImpl) sf).stopPingingAfterOne(); @@ -146,7 +146,7 @@ public class ConsumerStuckTest extends ActiveMQTestBase { @Test public void testClientStuckTestWithDirectDelivery() throws Exception { - ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024); + ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024).setCallTimeout(1000); ClientSessionFactory sf = locator.createSessionFactory(); ((ClientSessionFactoryImpl) sf).stopPingingAfterOne(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 121972d..8199b4c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -427,7 +427,7 @@ public class HangConsumerTest extends ActiveMQTestBase { SequentialFileFactory messagesFF = new NIOSequentialFileFactory(server.getConfiguration().getBindingsLocation(), null, 1); - JournalImpl messagesJournal = new JournalImpl(1024 * 1024, 2, 0, 0, messagesFF, "activemq-bindings", "bindings", 1); + JournalImpl messagesJournal = new JournalImpl(1024 * 1024, 2, 2, 0, 0, messagesFF, "activemq-bindings", "bindings", 1); messagesJournal.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java index 91ac760..774fb1b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java @@ -191,7 +191,7 @@ public class JournalCrashTest extends ActiveMQTestBase { */ private void printJournal() throws Exception { NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getJournalDir()), 100); - JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 0, 0, factory, "activemq-data", "amq", 100); + JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 2, 0, 0, factory, "activemq-data", "amq", 100); ArrayList<RecordInfo> records = new ArrayList<>(); ArrayList<PreparedTransactionInfo> transactions = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java index 94a9dbe..c8661b2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java @@ -1496,7 +1496,7 @@ public class PagingTest extends ActiveMQTestBase { List<PreparedTransactionInfo> list = new ArrayList<>(); - JournalImpl jrn = new JournalImpl(config.getJournalFileSize(), 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1); + JournalImpl jrn = new JournalImpl(config.getJournalFileSize(), 2, 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1); jrn.start(); jrn.load(records, list, null); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java index e5a52bd..f2e64b7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java @@ -266,7 +266,7 @@ public class RedeliveryConsumerTest extends ActiveMQTestBase { server.stop(); - JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(), 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1); + JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(), 2, 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1); final AtomicInteger updates = new AtomicInteger(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index a3c5c6e..8ba08a6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -1756,7 +1756,7 @@ public class BridgeTest extends ActiveMQTestBase { protected Map<Long, AtomicInteger> loadQueues(ActiveMQServer serverToInvestigate) throws Exception { SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalLocation(), 1); - JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(), serverToInvestigate.getConfiguration().getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); + JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(), serverToInvestigate.getConfiguration().getJournalMinFiles(), serverToInvestigate.getConfiguration().getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); List<RecordInfo> records = new LinkedList<>(); List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java index b91b89f..0cd9557 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java @@ -207,7 +207,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase { final byte recordType = (byte) 0; - journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO); + journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO); journal.start(); @@ -486,7 +486,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase { final CountDownLatch latchDone = new CountDownLatch(1); final CountDownLatch latchWait = new CountDownLatch(1); - journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) { + journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) { @Override protected SequentialFile createControlFile(final List<JournalFile> files, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java index 978406a..a88199a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java @@ -318,7 +318,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { } public static JournalImpl createJournal(final String journalType, final String journalDir) { - JournalImpl journal = new JournalImpl(10485760, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500); + JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500); return journal; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java ---------------------------------------------------------------------- diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java index ccd6933..5e2d69e 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java @@ -180,7 +180,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { @Test public void testSpeedTransactional() throws Exception { - Journal journal = new JournalImpl(10 * 1024 * 1024, 10, 0, 0, getFileFactory(), "activemq-data", "amq", 5000); + Journal journal = new JournalImpl(10 * 1024 * 1024, 10, 10, 0, 0, getFileFactory(), "activemq-data", "amq", 5000); journal.start(); @@ -236,7 +236,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { JournalImplTestUnit.log.debug("num Files=" + numFiles); - Journal journal = new JournalImpl(10 * 1024 * 1024, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000); + Journal journal = new JournalImpl(10 * 1024 * 1024, numFiles, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000); journal.start(); @@ -259,7 +259,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { journal.stop(); - journal = new JournalImpl(10 * 1024 * 1024, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000); + journal = new JournalImpl(10 * 1024 * 1024, numFiles, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000); journal.start(); journal.load(new ArrayList<RecordInfo>(), null, null);
