ARTEMIS-822 Review journal threading model https://issues.apache.org/jira/browse/ARTEMIS-822
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6afde8f4 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6afde8f4 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6afde8f4 Branch: refs/heads/master Commit: 6afde8f45aaa4f6a477066f3bc85fa8f89718a1d Parents: 4b47461 Author: Clebert Suconic <[email protected]> Authored: Thu Oct 27 12:32:04 2016 -0400 Committer: Clebert Suconic <[email protected]> Committed: Fri Oct 28 16:54:59 2016 -0400 ---------------------------------------------------------------------- .../jdbc/store/journal/JDBCJournalImpl.java | 4 + .../activemq/artemis/core/journal/Journal.java | 5 + .../core/journal/impl/FileWrapperJournal.java | 4 + .../artemis/core/journal/impl/JournalImpl.java | 236 ++++++++++++------- .../core/journal/impl/JournalTransaction.java | 2 +- .../cursor/impl/PageSubscriptionImpl.java | 3 +- .../journal/AbstractJournalStorageManager.java | 20 +- .../impl/journal/JDBCJournalStorageManager.java | 2 - .../impl/journal/JournalStorageManager.java | 21 +- .../core/replication/ReplicatedJournal.java | 5 + .../artemis/core/server/ServiceRegistry.java | 4 + .../core/server/impl/ActiveMQServerImpl.java | 61 +++-- .../core/server/impl/ServiceRegistryImpl.java | 12 + .../byteman/JMSBridgeReconnectionTest.java | 2 +- .../journal/NIOJournalCompactTest.java | 2 + .../journal/ValidateTransactionHealthTest.java | 2 + .../management/ActiveMQServerControlTest.java | 9 +- .../replication/ReplicationTest.java | 5 + .../journal/impl/AlignedJournalImplTest.java | 4 +- .../core/journal/impl/JournalImplTestUnit.java | 74 +++--- 20 files changed, 308 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 636309e..e112dbc 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -114,6 +114,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override + public void flush() throws Exception { + } + + @Override protected void createSchema() throws SQLException { createTable(sqlProvider.getCreateJournalTableSQL()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index 3c1f7fd..fbd4182 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -237,4 +237,9 @@ public interface Journal extends ActiveMQComponent { * only be called once the synchronization of the backup and live servers is completed. */ void replicationSyncFinished(); + + /** + * It will make sure there are no more pending operations on the Executors. + * */ + void flush() throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 51fb154..0b702a5 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -98,6 +98,10 @@ public final class FileWrapperJournal extends JournalBase { writeRecord(addRecord, sync, callback); } + @Override + public void flush() throws Exception { + } + /** * Write the record to the current file. */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 43db1f7..983bd7d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.core.journal.impl; import java.io.Serializable; import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -29,14 +31,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -70,8 +71,11 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalR import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.SimpleFuture; import org.jboss.logging.Logger; /** @@ -163,7 +167,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // Compacting may replace this structure private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>(); - private final Set<Long> pendingRecords = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>()); + private final Set<Long> pendingRecords = new ConcurrentHashSet<>(); // Compacting may replace this structure private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>(); @@ -173,14 +177,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private final AtomicBoolean compactorRunning = new AtomicBoolean(); - private ExecutorService filesExecutor = null; + private Executor filesExecutor = null; - private ExecutorService compactorExecutor = null; + private Executor compactorExecutor = null; - private ExecutorService appendExecutor = null; + private Executor appendExecutor = null; private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>(); + private final OrderedExecutorFactory providedIOThreadPool; + protected OrderedExecutorFactory ioExecutorFactory; + private ThreadPoolExecutor threadPool; + /** * We don't lock the journal during the whole compacting operation. During compacting we only * lock it (i) when gathering the initial structure, and (ii) when replicating the structures @@ -223,8 +231,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final String fileExtension, final int maxAIO, final int userVersion) { + this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, userVersion); + } + + public JournalImpl(final OrderedExecutorFactory ioExecutors, + final int fileSize, + final int minFiles, + final int poolSize, + final int compactMinFiles, + final int compactPercentage, + final SequentialFileFactory fileFactory, + final String filePrefix, + final String fileExtension, + final int maxAIO, + final int userVersion) { + super(fileFactory.isSupportsCallbacks(), fileSize); + this.providedIOThreadPool = ioExecutors; + if (fileSize % fileFactory.getAlignment() != 0) { throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " + fileFactory.getAlignment()); @@ -693,7 +718,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal lineUpContext(callback); pendingRecords.add(id); - Future<?> result = appendExecutor.submit(new Runnable() { + + final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); + appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); @@ -710,7 +737,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal ", usedFile = " + usedFile); } + if (result != null) { + result.set(true); + } } catch (Exception e) { + if (result != null) { + result.fail(e); + } ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); } finally { pendingRecords.remove(id); @@ -719,7 +752,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); - if (sync && callback == null) { + if (result != null) { result.get(); } } @@ -734,7 +767,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal lineUpContext(callback); checkKnownRecordID(id); - Future<?> result = appendExecutor.submit(new Runnable() { + final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); + + appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); @@ -758,7 +793,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } else { jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); } + + if (result != null) { + result.set(true); + } } catch (Exception e) { + if (result != null) { + result.fail(e); + } ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); } finally { journalLock.readLock().unlock(); @@ -766,7 +808,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); - if (sync && callback == null) { + if (result != null) { result.get(); } } @@ -777,7 +819,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal lineUpContext(callback); checkKnownRecordID(id); - Future<?> result = appendExecutor.submit(new Runnable() { + final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); + appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); @@ -801,7 +844,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } else { record.delete(usedFile); } + if (result != null) { + result.set(true); + } } catch (Exception e) { + if (result != null) { + result.fail(e); + } ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); } finally { journalLock.readLock().unlock(); @@ -809,11 +858,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); - if (sync && callback == null) { + if (result != null) { result.get(); } } + private static SimpleFuture<Boolean> newSyncAndCallbackResult(boolean sync, IOCompletion callback) { + return (sync && callback == null) ? new SimpleFuture<>() : null; + } + @Override public void appendAddRecordTransactional(final long txID, final long id, @@ -824,7 +877,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final JournalTransaction tx = getTransactionInfo(txID); tx.checkErrorCondition(); - appendExecutor.submit(new Runnable() { + appendExecutor.execute(new Runnable() { @Override public void run() { @@ -860,15 +913,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal return; } + final SimpleFuture<Boolean> known = new SimpleFuture<>(); + // retry on the append thread. maybe the appender thread is not keeping up. - Future<Boolean> known = appendExecutor.submit(new Callable<Boolean>() { + appendExecutor.execute(new Runnable() { @Override - public Boolean call() throws Exception { + public void run() { journalLock.readLock().lock(); try { - return records.containsKey(id) + + known.set(records.containsKey(id) || pendingRecords.contains(id) - || (compactor != null && compactor.lookupRecord(id)); + || (compactor != null && compactor.lookupRecord(id))); } finally { journalLock.readLock().unlock(); } @@ -900,7 +956,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final JournalTransaction tx = getTransactionInfo(txID); tx.checkErrorCondition(); - appendExecutor.submit(new Runnable() { + appendExecutor.execute(new Runnable() { @Override public void run() { @@ -941,7 +997,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final JournalTransaction tx = getTransactionInfo(txID); tx.checkErrorCondition(); - appendExecutor.submit(new Runnable() { + appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); @@ -991,7 +1047,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final JournalTransaction tx = getTransactionInfo(txID); tx.checkErrorCondition(); - Future<?> result = appendExecutor.submit(new Runnable() { + final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); + + appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); @@ -1004,7 +1062,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } tx.prepare(usedFile); + if (result != null) { + result.set(true); + } } catch (Exception e) { + if (result != null) { + result.fail(e); + } ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); setErrorCondition(tx, e); } finally { @@ -1013,7 +1077,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); - if (sync && callback == null) { + if (result != null) { result.get(); tx.checkErrorCondition(); } @@ -1055,8 +1119,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } tx.checkErrorCondition(); + final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); - Future<?> result = appendExecutor.submit(new Runnable() { + appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); @@ -1070,7 +1135,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } tx.commit(usedFile); + if (result != null) { + result.set(true); + } } catch (Exception e) { + if (result != null) { + result.fail(e); + } ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); setErrorCondition(tx, e); } finally { @@ -1079,7 +1150,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); - if (sync && callback == null) { + if (result != null) { result.get(); tx.checkErrorCondition(); } @@ -1097,8 +1168,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } tx.checkErrorCondition(); - - Future<?> result = appendExecutor.submit(new Runnable() { + final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); + appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); @@ -1107,7 +1178,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback); tx.rollback(usedFile); + if (result != null) { + result.set(true); + } } catch (Exception e) { + if (result != null) { + result.fail(e); + } ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); setErrorCondition(tx, e); } finally { @@ -1116,7 +1193,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); - if (sync && callback == null) { + if (result != null) { result.get(); tx.checkErrorCondition(); } @@ -1981,35 +2058,39 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void debugWait() throws InterruptedException { fileFactory.flush(); - if (appendExecutor != null && !appendExecutor.isShutdown()) { - // Send something to the closingExecutor, just to make sure we went until its end - final CountDownLatch latch = newLatch(1); + flushExecutor(filesExecutor); - appendExecutor.execute(new Runnable() { + flushExecutor(appendExecutor); + } - @Override - public void run() { - latch.countDown(); - } + @Override + public void flush() throws Exception { + fileFactory.flush(); - }); - awaitLatch(latch, -1); - } - if (filesExecutor != null && !filesExecutor.isShutdown()) { + flushExecutor(appendExecutor); + + flushExecutor(filesExecutor); + + flushExecutor(compactorExecutor); + } + + private void flushExecutor(Executor executor) throws InterruptedException { + + if (executor != null) { // Send something to the closingExecutor, just to make sure we went until its end - final CountDownLatch latch = newLatch(1); + final CountDownLatch latch = new CountDownLatch(1); + + executor.execute(new Runnable() { - filesExecutor.execute(new Runnable() { @Override public void run() { latch.countDown(); } - }); - awaitLatch(latch, -1); + }); + latch.await(10, TimeUnit.SECONDS); } - } @Override @@ -2099,7 +2180,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }; - appendExecutor.submit(new Runnable() { + appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); @@ -2132,29 +2213,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal throw new IllegalStateException("Journal " + this + " is not stopped, state is " + state); } - filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { - - @Override - public Thread newThread(final Runnable r) { - return new Thread(r, "JournalImpl::FilesExecutor"); - } - }); + if (providedIOThreadPool == null) { + ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() { + @Override + public ThreadFactory run() { + return new ActiveMQThreadFactory("ArtemisIOThread", true, JournalImpl.class.getClassLoader()); + } + }); - compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + threadPool = new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue<>(), factory); + ioExecutorFactory = new OrderedExecutorFactory(threadPool); + } else { + ioExecutorFactory = providedIOThreadPool; + } - @Override - public Thread newThread(final Runnable r) { - return new Thread(r, "JournalImpl::CompactorExecutor"); - } - }); + filesExecutor = ioExecutorFactory.getExecutor(); - appendExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + compactorExecutor = ioExecutorFactory.getExecutor(); - @Override - public Thread newThread(final Runnable r) { - return new Thread(r, "JournalImpl::appendExecutor"); - } - }); + appendExecutor = ioExecutorFactory.getExecutor(); filesRepository.setExecutor(filesExecutor); @@ -2171,29 +2248,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal setJournalState(JournalState.STOPPED); - // appendExecutor must be shut down first - appendExecutor.shutdown(); - - if (!appendExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.couldNotStopJournalAppendExecutor(); - } + flush(); - journalLock.writeLock().lock(); - try { - compactorExecutor.shutdown(); + if (providedIOThreadPool == null) { + threadPool.shutdown(); - if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.couldNotStopCompactor(); + if (!threadPool.awaitTermination(120, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); } + threadPool = null; + ioExecutorFactory = null; + } - filesExecutor.shutdown(); - - filesRepository.setExecutor(null); - - if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor(); - } + journalLock.writeLock().lock(); + try { try { for (CountDownLatch latch : latches) { latch.countDown(); @@ -2207,7 +2276,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (currentFile != null && currentFile.getFile().isOpen()) { currentFile.getFile().close(); } - filesRepository.clear(); fileFactory.stop(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java index 1542bd4..8e40f3b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java @@ -229,7 +229,7 @@ public class JournalTransaction { public void commit(final JournalFile file) { JournalCompactor compactor = journal.getCompactor(); - if (compacting) { + if (compacting && compactor != null) { compactor.addCommandCommit(this, file); } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 063722c..c40d20d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -192,7 +192,8 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public void reloadPageCompletion(PagePosition position) throws Exception { // if the current page is complete, we must move it out of the way - if (pageStore.getCurrentPage().getPageId() == position.getPageNr()) { + if (pageStore != null && pageStore.getCurrentPage() != null && + pageStore.getCurrentPage().getPageId() == position.getPageNr()) { pageStore.forceAnotherPage(); } PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index a6938d6..768be45 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -19,11 +19,9 @@ package org.apache.activemq.artemis.core.persistence.impl.journal; import javax.transaction.xa.Xid; import java.io.File; import java.io.FileInputStream; -import java.security.AccessController; import java.security.DigestInputStream; import java.security.InvalidParameterException; import java.security.MessageDigest; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -34,8 +32,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -103,7 +99,6 @@ import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; -import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.IDGenerator; @@ -168,7 +163,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { final Executor executor; - ExecutorService singleThreadExecutor; + Executor singleThreadExecutor; private final boolean syncTransactional; @@ -286,10 +281,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager { OperationContextImpl.setContext(context); } - public Executor getSingleThreadExecutor() { - return singleThreadExecutor; - } - @Override public OperationContext newSingleThreadContext() { return newContext(singleThreadExecutor); @@ -1429,12 +1420,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { beforeStart(); - singleThreadExecutor = Executors.newSingleThreadExecutor(AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() { - @Override - public ActiveMQThreadFactory run() { - return new ActiveMQThreadFactory("ActiveMQ-IO-SingleThread", true, JournalStorageManager.class.getClassLoader()); - } - })); + singleThreadExecutor = executorFactory.getExecutor(); bindingsJournal.start(); @@ -1490,8 +1476,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager { messageJournal.stop(); - singleThreadExecutor.shutdown(); - journalLoaded = false; started = false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index a0f0ed1..d97f988 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -101,8 +101,6 @@ public class JDBCJournalStorageManager extends JournalStorageManager { messageJournal.stop(); largeMessagesFactory.stop(); - singleThreadExecutor.shutdown(); - journalLoaded = false; started = false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 9eaa203..2d8411a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -197,14 +198,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } final CountDownLatch latch = new CountDownLatch(1); - executor.execute(new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }); + try { + executor.execute(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); - latch.await(30, TimeUnit.SECONDS); + latch.await(30, TimeUnit.SECONDS); + } catch (RejectedExecutionException ignored) { + // that's ok + } // We cache the variable as the replicator could be changed between here and the time we call stop // since sendLiveIsStopping may issue a close back from the channel @@ -225,8 +230,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager { messageJournal.stop(); - singleThreadExecutor.shutdown(); - journalLoaded = false; started = false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index 6668c71..d70316f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -64,6 +64,11 @@ public class ReplicatedJournal implements Journal { this.replicationManager = replicationManager; } + @Override + public void flush() throws Exception { + + } + /** * @param id * @param recordType http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java index b0fa658..0583600 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java @@ -36,6 +36,10 @@ public interface ServiceRegistry { void setExecutorService(ExecutorService executorService); + ExecutorService getIOExecutorService(); + + void setIOExecutorService(ExecutorService ioExecutorService); + ScheduledExecutorService getScheduledExecutorService(); void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 98abce0..6288bdf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -38,11 +38,12 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -150,6 +151,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.CertificateUtil; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -230,6 +232,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { private volatile ExecutorFactory executorFactory; + + private volatile ExecutorService ioExecutorPool; + /** + * This is a thread pool for io tasks only. + * We can't use the same global executor to avoid starvations. + */ + private volatile ExecutorFactory ioExecutorFactory; + private final HierarchicalRepository<Set<Role>> securityRepository; private volatile ResourceManager resourceManager; @@ -859,17 +869,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { } if (threadPool != null && !threadPoolSupplied) { - threadPool.shutdown(); - try { - if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) { - ActiveMQServerLogger.LOGGER.timedOutStoppingThreadpool(threadPool); - for (Runnable r : threadPool.shutdownNow()) { - logger.debug("Cancelled the execution of " + r); - } - } - } catch (InterruptedException e) { - ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(threadPool.getClass().getName()); - } + shutdownPool(threadPool); + } + + if (ioExecutorPool != null) { + shutdownPool(ioExecutorPool); } if (!threadPoolSupplied) @@ -950,6 +954,20 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + private void shutdownPool(ExecutorService executorService) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + ActiveMQServerLogger.LOGGER.timedOutStoppingThreadpool(threadPool); + for (Runnable r : executorService.shutdownNow()) { + logger.debug("Cancelled the execution of " + r); + } + } + } catch (InterruptedException e) { + ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(threadPool.getClass().getName()); + } + } + public boolean checkLiveIsNotColocated(String nodeId) { if (parentServer == null) { return true; @@ -1805,10 +1823,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { return new ActiveMQThreadFactory("ActiveMQ-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader()); } }); + if (configuration.getThreadPoolMaxSize() == -1) { - threadPool = Executors.newCachedThreadPool(tFactory); + threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory); } else { - threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory); + threadPool = new ActiveMQThreadPoolExecutor(0, configuration.getThreadPoolMaxSize(), 60L, TimeUnit.SECONDS, tFactory); } } else { threadPool = serviceRegistry.getExecutorService(); @@ -1816,6 +1835,20 @@ public class ActiveMQServerImpl implements ActiveMQServer { } this.executorFactory = new OrderedExecutorFactory(threadPool); + + if (serviceRegistry.getIOExecutorService() != null) { + this.ioExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getIOExecutorService()); + } else { + ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() { + @Override + public ThreadFactory run() { + return new ActiveMQThreadFactory("ActiveMQ-IO-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader()); + } + }); + + this.ioExecutorPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory); + } + /* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry. If so we use this * Scheduled ExecutorService otherwise we create a new one. */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java index 1d08f4a..a287a00 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java @@ -41,6 +41,8 @@ public class ServiceRegistryImpl implements ServiceRegistry { private ExecutorService executorService; + private ExecutorService ioExecutorService; + private ScheduledExecutorService scheduledExecutorService; /* We are using a List rather than HashMap here as ActiveMQ Artemis allows multiple instances of the same class to be added @@ -163,6 +165,16 @@ public class ServiceRegistryImpl implements ServiceRegistry { } @Override + public ExecutorService getIOExecutorService() { + return ioExecutorService; + } + + @Override + public void setIOExecutorService(ExecutorService ioExecutorService) { + this.ioExecutorService = ioExecutorService; + } + + @Override public void addBridgeTransformer(String name, Transformer transformer) { bridgeTransformers.put(name, transformer); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java index 0a5d52d..ef71e89 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java @@ -50,7 +50,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase { targetClass = "org.apache.activemq.artemis.core.client.impl.ClientProducerImpl", targetMethod = "sendRegularMessage", targetLocation = "ENTRY", - action = "org.apache.activemq.artemis.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($1,$2,$3);")}) + action = "org.apache.activemq.artemis.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($2,$3,$4);")}) public void performCrashDestinationStopBridge() throws Exception { activeMQServer = jmsServer1; ConnectionFactoryFactory factInUse0 = cff0; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java index 2dd38ae..519ffb5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java @@ -713,6 +713,8 @@ public class NIOJournalCompactTest extends JournalImplTestBase { journal.testCompact(); } + journal.flush(); + stopJournal(); createJournal(); startJournal(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java index 2d3df3e..8f15c48 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java @@ -314,6 +314,8 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { throw e; } + journal.flush(); + return journal; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 7dd2d0b..27a2838 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -58,6 +58,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait; import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -265,12 +266,18 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ServerLocator receiveLocator = createInVMNonHALocator(); ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator); ClientSession receiveClientSession = receiveCsf.createSession(true, false, false); - ClientConsumer consumer = receiveClientSession.createConsumer(name); + final ClientConsumer consumer = receiveClientSession.createConsumer(name); Assert.assertFalse(consumer.isClosed()); checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); serverControl.destroyQueue(name.toString(), true); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return consumer.isClosed(); + } + }, 1000, 100); Assert.assertTrue(consumer.isClosed()); checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 9d63e1d..7d2d514 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -651,6 +651,11 @@ public final class ReplicationTest extends ActiveMQTestBase { } @Override + public void flush() throws Exception { + + } + + @Override public void appendCommitRecord(final long txID, final boolean sync) throws Exception { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java index 5e27b36..2b24296 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java @@ -434,9 +434,6 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { Assert.assertEquals(0, records.size()); Assert.assertEquals(0, transactions.size()); - - Assert.assertEquals(2, factory.listFiles("tt").size()); - } @Test @@ -944,6 +941,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { // Reclaiming should still be able to reclaim a file if a transaction was ignored journalImpl.checkReclaimStatus(); + journalImpl.flush(); Assert.assertEquals(2, factory.listFiles("tt").size()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6afde8f4/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index 8f23c2c..eb815ae 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.journal.TestableJournal; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.tests.unit.UnitTestLogger; import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding; @@ -439,7 +440,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { /** * Use: calculateNumberOfFiles (fileSize, numberOfRecords, recordSize, numberOfRecords2, recordSize2, , ...., numberOfRecordsN, recordSizeN); */ - private int calculateNumberOfFiles(final int fileSize, final int alignment, final int... record) throws Exception { + private int calculateNumberOfFiles(TestableJournal journal, final int fileSize, final int alignment, final int... record) throws Exception { + if (journal != null) { + journal.flush(); + } int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment); int currentPosition = headerSize; int totalFiles = 0; @@ -489,7 +493,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { add(i); } - int numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 91, JournalImpl.SIZE_ADD_RECORD + recordLength); + int numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 91, JournalImpl.SIZE_ADD_RECORD + recordLength); Assert.assertEquals(numberOfFiles, journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); @@ -512,7 +516,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { add(i); } - numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 95, JournalImpl.SIZE_ADD_RECORD + recordLength); + numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 95, JournalImpl.SIZE_ADD_RECORD + recordLength); Assert.assertEquals(numberOfFiles, journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); @@ -533,7 +537,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { add(i); } - numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 200, JournalImpl.SIZE_ADD_RECORD + recordLength); + numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 200, JournalImpl.SIZE_ADD_RECORD + recordLength); Assert.assertEquals(numberOfFiles, journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); @@ -646,14 +650,14 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { @Test public void testCalculations() throws Exception { - Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 1, 1, 10, 2, 20)); - Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 512, 1, 1)); - Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 512, 19, 10)); - Assert.assertEquals(1, calculateNumberOfFiles(10 * 1024, 512, 20, 10)); - Assert.assertEquals(0, calculateNumberOfFiles(3000, 500, 2, 1000, 1, 500)); - Assert.assertEquals(1, calculateNumberOfFiles(3000, 500, 2, 1000, 1, 1000)); - Assert.assertEquals(9, calculateNumberOfFiles(10240, 1, 90, 1038, 45, 10)); - Assert.assertEquals(11, calculateNumberOfFiles(10 * 1024, 512, 60, 14 + 1024, 30, 14)); + Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 1, 1, 10, 2, 20)); + Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 512, 1, 1)); + Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 512, 19, 10)); + Assert.assertEquals(1, calculateNumberOfFiles(journal, 10 * 1024, 512, 20, 10)); + Assert.assertEquals(0, calculateNumberOfFiles(journal, 3000, 500, 2, 1000, 1, 500)); + Assert.assertEquals(1, calculateNumberOfFiles(journal, 3000, 500, 2, 1000, 1, 1000)); + Assert.assertEquals(9, calculateNumberOfFiles(journal, 10240, 1, 90, 1038, 45, 10)); + Assert.assertEquals(11, calculateNumberOfFiles(journal, 10 * 1024, 512, 60, 14 + 1024, 30, 14)); } @Test @@ -862,13 +866,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addTx(1, i); } - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); List<String> files2 = fileFactory.listFiles(fileExtension); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength) + 2, files2.size()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files2.size()); Assert.assertEquals(1, journal.getOpenedFilesCount()); for (String file : files1) { @@ -879,13 +883,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { // Make sure nothing reclaimed - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); List<String> files3 = fileFactory.listFiles(fileExtension); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size()); Assert.assertEquals(1, journal.getOpenedFilesCount()); for (String file : files1) { @@ -898,13 +902,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { updateTx(1, i); } - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); List<String> files4 = fileFactory.listFiles(fileExtension); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size()); Assert.assertEquals(1, journal.getOpenedFilesCount()); for (String file : files1) { @@ -915,7 +919,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { // Make sure nothing reclaimed - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); @@ -934,14 +938,14 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { deleteTx(1, i); } - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); List<String> files7 = fileFactory.listFiles(fileExtension); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files7.size()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files7.size()); Assert.assertEquals(1, journal.getOpenedFilesCount()); for (String file : files1) { @@ -950,13 +954,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { checkAndReclaimFiles(); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); List<String> files8 = fileFactory.listFiles(fileExtension); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files8.size()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files8.size()); Assert.assertEquals(1, journal.getOpenedFilesCount()); for (String file : files1) { @@ -977,13 +981,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { add(i); } - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(10, journal.getIDMapSize()); List<String> files9 = fileFactory.listFiles(fileExtension); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength) + 2, files9.size()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength) + 2, files9.size()); Assert.assertEquals(1, journal.getOpenedFilesCount()); for (String file : files1) { @@ -1458,7 +1462,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(3, files2.size()); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(1, journal.getOpenedFilesCount()); Assert.assertEquals(1, journal.getIDMapSize()); @@ -1467,10 +1471,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { List<String> files3 = fileFactory.listFiles(fileExtension); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size()); Assert.assertEquals(1, journal.getOpenedFilesCount()); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(2, journal.getIDMapSize()); @@ -1478,10 +1482,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { List<String> files4 = fileFactory.listFiles(fileExtension); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size()); Assert.assertEquals(1, journal.getOpenedFilesCount()); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(1, journal.getIDMapSize()); @@ -1549,10 +1553,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { rollback(1); // in file 1 - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount()); Assert.assertEquals(1, journal.getOpenedFilesCount()); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(1, journal.getIDMapSize()); @@ -1560,10 +1564,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { List<String> files4 = fileFactory.listFiles(fileExtension); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size()); Assert.assertEquals(1, journal.getOpenedFilesCount()); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); @@ -1669,7 +1673,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(3, files2.size()); - Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount()); + Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(1, journal.getOpenedFilesCount()); Assert.assertEquals(1, journal.getIDMapSize());
