http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java ---------------------------------------------------------------------- diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java new file mode 100644 index 0000000..32d8816 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.performance.storage; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.paging.impl.Page; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.replication.ReplicationManager; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.server.RouteContextList; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class PersistMultiThreadTest extends ActiveMQTestBase { + + final String DIRECTORY = "./target/journaltmp"; + + FakePagingStore fakePagingStore = new FakePagingStore(); + + @Test + public void testMultipleWrites() throws Exception { + deleteDirectory(new File(DIRECTORY)); + ActiveMQServer server = createServer(true); + server.getConfiguration().setJournalCompactMinFiles(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles()); + server.getConfiguration().setJournalCompactPercentage(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage()); + server.getConfiguration().setJournalDirectory(DIRECTORY + "/journal"); + server.getConfiguration().setBindingsDirectory(DIRECTORY + "/bindings"); + server.getConfiguration().setPagingDirectory(DIRECTORY + "/paging"); + server.getConfiguration().setLargeMessagesDirectory(DIRECTORY + "/largemessage"); + + server.getConfiguration().setJournalFileSize(10 * 1024 * 1024); + server.getConfiguration().setJournalMinFiles(2); + server.getConfiguration().setJournalType(JournalType.ASYNCIO); + + server.start(); + + StorageManager storage = server.getStorageManager(); + + long msgID = storage.generateID(); + System.out.println("msgID=" + msgID); + + int NUMBER_OF_THREADS = 50; + int NUMBER_OF_MESSAGES = 5000; + + MyThread[] threads = new MyThread[NUMBER_OF_THREADS]; + + final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS); + final CountDownLatch startFlag = new CountDownLatch(1); + final CountDownLatch finishFlag = new CountDownLatch(NUMBER_OF_THREADS); + + MyDeleteThread deleteThread = new MyDeleteThread("deleteThread", storage, NUMBER_OF_MESSAGES * NUMBER_OF_THREADS * 10); + deleteThread.start(); + + for (int i = 0; i < threads.length; i++) { + threads[i] = new MyThread("writer::" + i, storage, NUMBER_OF_MESSAGES, alignFlag, startFlag, finishFlag); + } + + for (MyThread t : threads) { + t.start(); + } + + alignFlag.await(); + + long startTime = System.currentTimeMillis(); + startFlag.countDown(); + + // I'm using a countDown to avoid measuring time spent on thread context from join. + // i.e. i want to measure as soon as the loops are done + finishFlag.await(); + long endtime = System.currentTimeMillis(); + + System.out.println("Time:: " + (endtime - startTime)); + + for (MyThread t : threads) { + t.join(); + Assert.assertEquals(0, t.errors.get()); + } + + deleteThread.join(); + Assert.assertEquals(0, deleteThread.errors.get()); + + } + + LinkedBlockingDeque<Long> deletes = new LinkedBlockingDeque<>(); + + class MyThread extends Thread { + + final StorageManager storage; + final int numberOfMessages; + final AtomicInteger errors = new AtomicInteger(0); + + final CountDownLatch align; + final CountDownLatch start; + final CountDownLatch finish; + + MyThread(String name, + StorageManager storage, + int numberOfMessages, + CountDownLatch align, + CountDownLatch start, + CountDownLatch finish) { + super(name); + this.storage = storage; + this.numberOfMessages = numberOfMessages; + this.align = align; + this.start = start; + this.finish = finish; + } + + public void run() { + try { + align.countDown(); + start.await(); + + long id = storage.generateID(); + long txID = storage.generateID(); + + // each thread will store a single message that will never be deleted, trying to force compacting to happen + storeMessage(txID, id); + storage.commit(txID); + + OperationContext ctx = storage.getContext(); + + for (int i = 0; i < numberOfMessages; i++) { + + txID = storage.generateID(); + + long[] messageID = new long[10]; + + for (int msgI = 0; msgI < 10; msgI++) { + id = storage.generateID(); + + messageID[msgI] = id; + + storeMessage(txID, id); + } + + storage.commit(txID); + ctx.waitCompletion(); + + for (long deleteID : messageID) { + deletes.add(deleteID); + } + } + } + catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + finally { + finish.countDown(); + } + + } + + private void storeMessage(long txID, long id) throws Exception { + ServerMessage message = new ServerMessageImpl(id, 10 * 1024); + message.setPagingStore(fakePagingStore); + + message.getBodyBuffer().writeBytes(new byte[104]); + message.putStringProperty("hello", "" + id); + + storage.storeMessageTransactional(txID, message); + storage.storeReferenceTransactional(txID, 1, id); + + message.decrementRefCount(); + } + + } + + class MyDeleteThread extends Thread { + + final StorageManager storage; + final int numberOfMessages; + final AtomicInteger errors = new AtomicInteger(0); + + MyDeleteThread(String name, StorageManager storage, int numberOfMessages) { + super(name); + this.storage = storage; + this.numberOfMessages = numberOfMessages; + } + + public void run() { + long deletesNr = 0; + try { + + for (int i = 0; i < numberOfMessages; i++) { + if (i % 1000 == 0) { + // storage.getContext().waitCompletion(); + // deletesNr = 0; + // Thread.sleep(200); + } + deletesNr++; + Long deleteID = deletes.poll(10, TimeUnit.MINUTES); + if (deleteID == null) { + System.err.println("Coudn't poll delete info"); + errors.incrementAndGet(); + break; + } + + storage.storeAcknowledge(1, deleteID); + storage.deleteMessage(deleteID); + } + } + catch (Exception e) { + e.printStackTrace(System.out); + errors.incrementAndGet(); + } + finally { + System.err.println("Finished the delete loop!!!! deleted " + deletesNr); + } + } + } + + class FakePagingStore implements PagingStore { + + @Override + public SimpleString getAddress() { + return null; + } + + @Override + public int getNumberOfPages() { + return 0; + } + + @Override + public int getCurrentWritingPage() { + return 0; + } + + @Override + public SimpleString getStoreName() { + return null; + } + + @Override + public File getFolder() { + return null; + } + + @Override + public AddressFullMessagePolicy getAddressFullMessagePolicy() { + return null; + } + + @Override + public long getFirstPage() { + return 0; + } + + @Override + public long getPageSizeBytes() { + return 0; + } + + @Override + public long getAddressSize() { + return 0; + } + + @Override + public long getMaxSize() { + return 0; + } + + @Override + public void applySetting(AddressSettings addressSettings) { + + } + + @Override + public boolean isPaging() { + return false; + } + + @Override + public void sync() throws Exception { + + } + + @Override + public void ioSync() throws Exception { + + } + + @Override + public boolean page(ServerMessage message, + Transaction tx, + RouteContextList listCtx, + ReentrantReadWriteLock.ReadLock readLock) throws Exception { + return false; + } + + @Override + public Page createPage(int page) throws Exception { + return null; + } + + @Override + public boolean checkPageFileExists(int page) throws Exception { + return false; + } + + @Override + public PagingManager getPagingManager() { + return null; + } + + @Override + public PageCursorProvider getCursorProvider() { + return null; + } + + @Override + public void processReload() throws Exception { + + } + + @Override + public Page depage() throws Exception { + return null; + } + + @Override + public void forceAnotherPage() throws Exception { + + } + + @Override + public Page getCurrentPage() { + return null; + } + + @Override + public boolean startPaging() throws Exception { + return false; + } + + @Override + public void stopPaging() throws Exception { + + } + + @Override + public void addSize(int size) { + + } + + @Override + public boolean checkMemory(Runnable runnable) { + return false; + } + + @Override + public boolean lock(long timeout) { + return false; + } + + @Override + public void unlock() { + + } + + @Override + public void flushExecutors() { + + } + + @Override + public Collection<Integer> getCurrentIds() throws Exception { + return null; + } + + @Override + public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception { + + } + + @Override + public void disableCleanup() { + + } + + @Override + public void enableCleanup() { + + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public boolean isStarted() { + return false; + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/SendReceiveMultiThreadTest.java ---------------------------------------------------------------------- diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/SendReceiveMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/SendReceiveMultiThreadTest.java new file mode 100644 index 0000000..c55ac22 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/SendReceiveMultiThreadTest.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.performance.storage; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class SendReceiveMultiThreadTest extends ActiveMQTestBase { + + final String DIRECTORY = "./target/journaltmp"; + + ConnectionFactory cf; + + Destination destination; + + AtomicInteger received = new AtomicInteger(0); + + AtomicInteger sent = new AtomicInteger(0); + + int NUMBER_OF_THREADS = 400; + int NUMBER_OF_MESSAGES = 5000; + + CountDownLatch receivedLatch = new CountDownLatch(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS); + + @Test + public void testMultipleWrites() throws Exception { + deleteDirectory(new File(DIRECTORY)); + ActiveMQServer server = createServer(true); + server.getConfiguration().setJournalFileSize(10 * 1024 * 1024); + server.getConfiguration().setJournalMinFiles(2); + server.getConfiguration().setJournalCompactMinFiles(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles()); + server.getConfiguration().setJournalCompactPercentage(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage()); + server.getConfiguration().setJournalType(JournalType.ASYNCIO); + server.getConfiguration().addAcceptorConfiguration("core", DefaultConnectionProperties.DEFAULT_BROKER_BIND_URL); + server.getConfiguration().setJournalDirectory(DIRECTORY + "/journal"); + server.getConfiguration().setBindingsDirectory(DIRECTORY + "/bindings"); + server.getConfiguration().setPagingDirectory(DIRECTORY + "/paging"); + server.getConfiguration().setLargeMessagesDirectory(DIRECTORY + "/largemessage"); + server.getConfiguration().setJournalMaxIO_AIO(200); + + // TODO Setup Acceptors + + server.start(); + + Queue queue = server.createQueue(SimpleString.toSimpleString("jms.queue.performanceQueue"), SimpleString.toSimpleString("jms.queue.performanceQueue"), null, true, false); + + Queue queue2 = server.createQueue(SimpleString.toSimpleString("jms.queue.stationaryQueue"), SimpleString.toSimpleString("jms.queue.stationaryQueue"), null, true, false); + + MyThread[] threads = new MyThread[NUMBER_OF_THREADS]; + + ConsumerThread[] cthreads = new ConsumerThread[NUMBER_OF_THREADS]; + + final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS); + final CountDownLatch startFlag = new CountDownLatch(1); + final CountDownLatch finishFlag = new CountDownLatch(NUMBER_OF_THREADS); + + cf = new ActiveMQConnectionFactory(); + + Thread slowSending = new Thread() { + public void run() { + Connection conn = null; + try { + conn = cf.createConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(ActiveMQJMSClient.createQueue("stationaryQueue")); + + conn.start(); + MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue("stationaryQueue")); + + while (true) { + for (int i = 0; i < 10; i++) { + System.out.println("stationed message"); + producer.send(session.createTextMessage("stationed")); + session.commit(); + + Thread.sleep(1000); + } + + for (int i = 0; i < 10; i++) { + consumer.receive(5000); + session.commit(); + System.out.println("Receiving stationed"); + Thread.sleep(1000); + } + } + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + try { + conn.close(); + } + catch (Exception ignored) { + + } + } + + } + }; + + slowSending.start(); + + destination = ActiveMQJMSClient.createQueue("performanceQueue"); + + for (int i = 0; i < threads.length; i++) { + threads[i] = new MyThread("sender::" + i, NUMBER_OF_MESSAGES, alignFlag, startFlag, finishFlag); + cthreads[i] = new ConsumerThread(NUMBER_OF_MESSAGES); + } + + for (ConsumerThread t : cthreads) { + t.start(); + } + + for (MyThread t : threads) { + t.start(); + } + + Assert.assertEquals(NUMBER_OF_THREADS, queue.getConsumerCount()); + + alignFlag.await(); + + long startTime = System.currentTimeMillis(); + startFlag.countDown(); + + // I'm using a countDown to avoid measuring time spent on thread context from join. + // i.e. i want to measure as soon as the loops are done + finishFlag.await(); + long endtime = System.currentTimeMillis(); + + receivedLatch.await(); + long endTimeConsuming = System.currentTimeMillis(); + + for (ConsumerThread t : cthreads) { + t.join(); + Assert.assertEquals(0, t.errors); + } + + for (MyThread t : threads) { + t.join(); + Assert.assertEquals(0, t.errors.get()); + } + + slowSending.interrupt(); + slowSending.join(); + + server.stop(); + + System.out.println("Time on sending:: " + (endtime - startTime)); + System.out.println("Time on consuming:: " + (endTimeConsuming - startTime)); + } + + class ConsumerThread extends Thread { + + final int numberOfMessages; + + Connection connection; + Session session; + + MessageConsumer consumer; + + ConsumerThread(int numberOfMessages) throws Exception { + super("consumerthread"); + this.numberOfMessages = numberOfMessages; + + connection = cf.createConnection(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(destination); + connection.start(); + } + + int errors = 0; + + public void run() { + try { + + for (int i = 0; i < numberOfMessages; i++) { + Message message = consumer.receive(50000); + if (message == null) { + System.err.println("Could not receive message at i = " + numberOfMessages); + errors++; + break; + } + + int r = received.incrementAndGet(); + + if (r % 1000 == 0) { + System.out.println("Received " + r + " messages"); + } + + if (i % 50 == 0) { + session.commit(); + } + + receivedLatch.countDown(); + } + session.commit(); + connection.close(); + } + catch (Exception e) { + e.printStackTrace(); + errors++; + } + + } + } + + class MyThread extends Thread { + + final int numberOfMessages; + final AtomicInteger errors = new AtomicInteger(0); + + final CountDownLatch align; + final CountDownLatch start; + final CountDownLatch finish; + + MyThread(String name, int numberOfMessages, CountDownLatch align, CountDownLatch start, CountDownLatch finish) { + super(name); + this.numberOfMessages = numberOfMessages; + this.align = align; + this.start = start; + this.finish = finish; + } + + public void run() { + try { + + Connection connection = cf.createConnection(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = session.createProducer(destination); + + align.countDown(); + start.await(); + + for (int i = 0; i < numberOfMessages; i++) { + BytesMessage msg = session.createBytesMessage(); + msg.writeBytes(new byte[1024]); + producer.send(msg); + session.commit(); + + int s = sent.incrementAndGet(); + if (s % 1000 == 0) { + System.out.println("Sent " + s); + } + } + + connection.close(); + System.out.println("Send " + numberOfMessages + " messages on thread " + Thread.currentThread().getName()); + } + catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + finally { + finish.countDown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java ---------------------------------------------------------------------- diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java index 5c7ec19..eb49a99 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java @@ -75,7 +75,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase { public void testInsertAndLoad() throws Exception { SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDirfile(), 1000); - JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); + JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); impl.start(); @@ -91,7 +91,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase { impl.stop(); factory = new AIOSequentialFileFactory(getTestDirfile(), 1000); - impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); + impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); impl.start(); @@ -108,7 +108,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase { impl.stop(); factory = new AIOSequentialFileFactory(getTestDirfile(), 1000); - impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); + impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); impl.start(); @@ -136,7 +136,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase { public void testInsertUpdateAndLoad() throws Exception { SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDirfile(), 1000); - JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); + JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); impl.start(); @@ -153,7 +153,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase { impl.stop(); factory = new AIOSequentialFileFactory(getTestDirfile(), 1000); - impl = new JournalImpl(10 * 1024 * 1024, 10, 0, 0, factory, "amq", "amq", 1000); + impl = new JournalImpl(10 * 1024 * 1024, 10, 10, 0, 0, factory, "amq", "amq", 1000); impl.start(); @@ -170,7 +170,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase { impl.stop(); factory = new AIOSequentialFileFactory(getTestDirfile(), 1000); - impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); + impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000); impl.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java ---------------------------------------------------------------------- diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java index 75600a1..4d9eedc 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java @@ -115,7 +115,7 @@ public class JournalCleanupCompactStressTest extends ActiveMQTestBase { maxAIO = ActiveMQDefaultConfiguration.getDefaultJournalMaxIoNio(); } - journal = new JournalImpl(50 * 1024, 20, 50, ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), factory, "activemq-data", "amq", maxAIO) { + journal = new JournalImpl(50 * 1024, 20, 20, 50, ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), factory, "activemq-data", "amq", maxAIO) { @Override protected void onCompactLockingTheJournal() throws Exception { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java ---------------------------------------------------------------------- diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java index a6133b2..93e631b 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java @@ -98,7 +98,7 @@ public abstract class MixupCompactorTestBase extends JournalImplTestBase { @Override public void createJournal() throws Exception { - journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) { + journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) { @Override public void onCompactDone() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java ---------------------------------------------------------------------- diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java index bdb68d5..b555302 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java @@ -84,7 +84,7 @@ public class NIOMultiThreadCompactorStressTest extends ActiveMQTestBase { stopServer(); NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getJournalDir()), 1); - JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 0, 0, factory, "activemq-data", "amq", 100); + JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 2, 0, 0, factory, "activemq-data", "amq", 100); List<RecordInfo> committedRecords = new ArrayList<>(); List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java index e01263b..1872665 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java @@ -141,7 +141,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { factory = new FakeSequentialFileFactory(512, true); try { - journalImpl = new JournalImpl(2000, 2, 0, 0, factory, "tt", "tt", 1000); + journalImpl = new JournalImpl(2000, 2, 2, 0, 0, factory, "tt", "tt", 1000); Assert.fail("Expected IllegalArgumentException"); } catch (IllegalArgumentException ignored) { @@ -1201,7 +1201,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { public void testAlignmentOverReload() throws Exception { factory = new FakeSequentialFileFactory(512, false); - journalImpl = new JournalImpl(512 + 512 * 3, 20, 0, 0, factory, "amq", "amq", 1000); + journalImpl = new JournalImpl(512 + 512 * 3, 20, 20, 0, 0, factory, "amq", "amq", 1000); journalImpl.start(); @@ -1214,7 +1214,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.stop(); - journalImpl = new JournalImpl(512 + 1024 + 512, 20, 0, 0, factory, "amq", "amq", 1000); + journalImpl = new JournalImpl(512 + 1024 + 512, 20, 20, 0, 0, factory, "amq", "amq", 1000); addActiveMQComponent(journalImpl); journalImpl.start(); journalImpl.load(AlignedJournalImplTest.dummyLoader); @@ -1230,7 +1230,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.stop(); - journalImpl = new JournalImpl(512 + 1024 + 512, 20, 0, 0, factory, "amq", "amq", 1000); + journalImpl = new JournalImpl(512 + 1024 + 512, 20, 20, 0, 0, factory, "amq", "amq", 1000); addActiveMQComponent(journalImpl); journalImpl.start(); @@ -1301,7 +1301,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.stop(); } - journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000); + journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000); addActiveMQComponent(journalImpl); journalImpl.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java index f8a907a..c3b37e1 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java @@ -209,7 +209,7 @@ public class JournalAsyncTest extends ActiveMQTestBase { journalImpl.stop(); } - journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000); + journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000); journalImpl.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java index e502ff7..6c0564a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java @@ -54,6 +54,8 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { protected int minFiles; + protected int poolSize; + protected int fileSize; protected boolean sync; @@ -122,7 +124,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { // --------------------------------------------------------------------------------- protected void setup(final int minFreeFiles, final int fileSize, final boolean sync, final int maxAIO) { + this.minFiles = minFreeFiles; + this.poolSize = minFreeFiles; + this.fileSize = fileSize; + this.sync = sync; + this.maxAIO = maxAIO; + } + + protected void setup(final int minFreeFiles, final int poolSize, final int fileSize, final boolean sync, final int maxAIO) { minFiles = minFreeFiles; + this.poolSize = poolSize; this.fileSize = fileSize; this.sync = sync; this.maxAIO = maxAIO; @@ -130,13 +141,14 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { protected void setup(final int minFreeFiles, final int fileSize, final boolean sync) { minFiles = minFreeFiles; + poolSize = minFreeFiles; this.fileSize = fileSize; this.sync = sync; maxAIO = 50; } public void createJournal() throws Exception { - journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) { + journal = new JournalImpl(fileSize, minFiles, poolSize, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) { @Override public void onCompactDone() { latchDone.countDown(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index 02f1e9e..e714040 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -121,7 +121,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { @Test public void testParams() throws Exception { try { - new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 0, 0, fileFactory, filePrefix, fileExtension, 1); + new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 10, 0, 0, fileFactory, filePrefix, fileExtension, 1); Assert.fail("Should throw exception"); } @@ -130,7 +130,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { } try { - new JournalImpl(10 * 1024, 1, 0, 0, fileFactory, filePrefix, fileExtension, 1); + new JournalImpl(10 * 1024, 1, 0, 0, 0, fileFactory, filePrefix, fileExtension, 1); Assert.fail("Should throw exception"); } @@ -139,7 +139,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { } try { - new JournalImpl(10 * 1024, 10, 0, 0, null, filePrefix, fileExtension, 1); + new JournalImpl(10 * 1024, 10, 0, 0, 0, null, filePrefix, fileExtension, 1); Assert.fail("Should throw exception"); } @@ -148,7 +148,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { } try { - new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, null, fileExtension, 1); + new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, null, fileExtension, 1); Assert.fail("Should throw exception"); } @@ -157,7 +157,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { } try { - new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, filePrefix, null, 1); + new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, filePrefix, null, 1); Assert.fail("Should throw exception"); } @@ -166,7 +166,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { } try { - new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, filePrefix, null, 0); + new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, filePrefix, null, 0); Assert.fail("Should throw exception"); } @@ -567,6 +567,103 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { stopJournal(); } + + @Test + public void testOrganicallyGrowNoLimit() throws Exception { + setup(2, -1, 10 * 1024, true, 50); + createJournal(); + journal.setAutoReclaim(true); + startJournal(); + load(); + + List<String> files1 = fileFactory.listFiles(fileExtension); + + Assert.assertEquals(2, files1.size()); + + Assert.assertEquals(0, journal.getDataFilesCount()); + Assert.assertEquals(0, journal.getFreeFilesCount()); + Assert.assertEquals(1, journal.getOpenedFilesCount()); + Assert.assertEquals(0, journal.getIDMapSize()); + + // Fill all the files + + for (int i = 0; i < 200; i++) { + add(i); + journal.forceMoveNextFile(); + } + + + for (int i = 0; i < 200; i++) { + delete(i); + } + journal.forceMoveNextFile(); + + journal.checkReclaimStatus(); + + + + files1 = fileFactory.listFiles(fileExtension); + Assert.assertTrue(files1.size() > 200); + + int numberOfFiles = files1.size(); + + for (int i = 300; i < 350; i++) { + add(i); + journal.forceMoveNextFile(); + } + journal.checkReclaimStatus(); + + + files1 = fileFactory.listFiles(fileExtension); + Assert.assertTrue(files1.size() > 200); + + Assert.assertEquals(numberOfFiles, files1.size()); + + System.out.println("we have " + files1.size() + " files now"); + + stopJournal(); + } + + @Test + public void testOrganicallyWithALimit() throws Exception { + setup(2, 5, 10 * 1024, true, 50); + createJournal(); + journal.setAutoReclaim(true); + startJournal(); + load(); + + List<String> files1 = fileFactory.listFiles(fileExtension); + + Assert.assertEquals(2, files1.size()); + + Assert.assertEquals(0, journal.getDataFilesCount()); + Assert.assertEquals(0, journal.getFreeFilesCount()); + Assert.assertEquals(1, journal.getOpenedFilesCount()); + Assert.assertEquals(0, journal.getIDMapSize()); + + // Fill all the files + + for (int i = 0; i < 200; i++) { + add(i); + journal.forceMoveNextFile(); + } + + journal.checkReclaimStatus(); + + + for (int i = 0; i < 200; i++) { + delete(i); + } + journal.forceMoveNextFile(); + + journal.checkReclaimStatus(); + + files1 = fileFactory.listFiles(fileExtension); + Assert.assertTrue("supposed to have less than 10 but it had " + files1.size() + " files created", files1.size() < 10); + + stopJournal(); + } + // Validate the methods that are used on assertions @Test public void testCalculations() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java index 554a5e3..88cbcaa 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java @@ -39,7 +39,7 @@ public class BatchIDGeneratorUnitTest extends ActiveMQTestBase { @Test public void testSequence() throws Exception { NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir()), 1); - Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "activemq-bindings", "bindings", 1); + Journal journal = new JournalImpl(10 * 1024, 2, 2, 0, 0, factory, "activemq-bindings", "bindings", 1); journal.start();
