ARTEMIS-822 Injecting IO Pools into and from ArtemisServerImpl https://issues.apache.org/jira/browse/ARTEMIS-822
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7eadff76 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7eadff76 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7eadff76 Branch: refs/heads/master Commit: 7eadff76818546aa6045be2eeb2e6aef60992394 Parents: 6afde8f Author: Clebert Suconic <[email protected]> Authored: Fri Oct 28 11:11:59 2016 -0400 Committer: Clebert Suconic <[email protected]> Committed: Mon Oct 31 11:34:27 2016 -0400 ---------------------------------------------------------------------- .../cli/commands/tools/XmlDataExporter.java | 14 +++--- .../journal/JMSJournalStorageManagerImpl.java | 6 ++- .../jms/server/impl/JMSServerManagerImpl.java | 15 +++--- .../artemis/core/journal/impl/JournalImpl.java | 49 +++++++++++--------- .../journal/AbstractJournalStorageManager.java | 10 +++- .../impl/journal/JDBCJournalStorageManager.java | 6 ++- .../impl/journal/JournalStorageManager.java | 20 ++++---- .../artemis/core/server/ActiveMQServer.java | 2 + .../core/server/impl/ActiveMQServerImpl.java | 12 +++-- .../journal/NIOJournalCompactTest.java | 19 ++++++-- .../DeleteMessagesOnStartupTest.java | 2 +- .../integration/persistence/RestartSMTest.java | 2 +- .../persistence/StorageManagerTestBase.java | 6 +-- .../replication/ReplicationTest.java | 2 +- .../server/SuppliedThreadPoolTest.java | 2 + .../journal/impl/AlignedJournalImplTest.java | 2 - .../core/journal/impl/JournalImplTestUnit.java | 6 --- .../impl/DuplicateDetectionUnitTest.java | 10 ++-- 18 files changed, 108 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java index a0e6c1e..8030ce2 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java @@ -90,6 +90,7 @@ import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorag import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.OrderedExecutorFactory; @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.") public final class XmlDataExporter extends OptionalLocking { @@ -142,15 +143,10 @@ public final class XmlDataExporter extends OptionalLocking { String pagingDir, String largeMessagesDir) throws Exception { config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO); - final ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()); - ExecutorFactory executorFactory = new ExecutorFactory() { - @Override - public Executor getExecutor() { - return executor; - } - }; + final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()); + ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); - storageManager = new JournalStorageManager(config, executorFactory); + storageManager = new JournalStorageManager(config, executorFactory, executorFactory); XMLOutputFactory factory = XMLOutputFactory.newInstance(); XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8"); @@ -158,6 +154,8 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler); writeXMLData(); + + executor.shutdown(); } private void writeXMLData() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java index 32c438d..0aaa1a6 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings; import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory; import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination; import org.apache.activemq.artemis.jms.persistence.config.PersistedType; +import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.IDGenerator; public final class JMSJournalStorageManagerImpl implements JMSStorageManager { @@ -73,7 +74,8 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager { // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public JMSJournalStorageManagerImpl(final IDGenerator idGenerator, + public JMSJournalStorageManagerImpl(ExecutorFactory ioExecutors, + final IDGenerator idGenerator, final Configuration config, final ReplicationManager replicator) { if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO) { @@ -86,7 +88,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager { SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); - Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1); + Journal localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1, 0); if (replicator != null) { jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index dfa9218..456bb58 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -1544,16 +1544,13 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback * @throws Exception */ private void createJournal() throws Exception { - if (storage == null) { - if (coreConfig.isPersistenceEnabled()) { - storage = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager()); - } else { - storage = new NullJMSStorageManagerImpl(); - } + if (storage != null) { + storage.stop(); + } + if (coreConfig.isPersistenceEnabled()) { + storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager()); } else { - if (storage.isStarted()) { - storage.stop(); - } + storage = new NullJMSStorageManagerImpl(); } storage.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 983bd7d..b1093ed 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -74,6 +75,7 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.SimpleFuture; import org.jboss.logging.Logger; @@ -185,8 +187,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>(); - private final OrderedExecutorFactory providedIOThreadPool; - protected OrderedExecutorFactory ioExecutorFactory; + private final ExecutorFactory providedIOThreadPool; + protected ExecutorFactory ioExecutorFactory; private ThreadPoolExecutor threadPool; /** @@ -234,7 +236,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, userVersion); } - public JournalImpl(final OrderedExecutorFactory ioExecutors, + public JournalImpl(final ExecutorFactory ioExecutors, final int fileSize, final int minFiles, final int poolSize, @@ -744,7 +746,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (result != null) { result.fail(e); } - ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + logger.error("appendAddRecord::" + e, e); } finally { pendingRecords.remove(id); journalLock.readLock().unlock(); @@ -801,7 +803,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (result != null) { result.fail(e); } - ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + logger.error("appendUpdateRecord:" + e, e); } finally { journalLock.readLock().unlock(); } @@ -851,7 +853,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (result != null) { result.fail(e); } - ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + logger.error("appendDeleteRecord:" + e, e); } finally { journalLock.readLock().unlock(); } @@ -899,7 +901,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.addPositive(usedFile, id, addRecord.getEncodeSize()); } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + logger.error("appendAddRecordTransactional:" + e, e); setErrorCondition(tx, e); } finally { journalLock.readLock().unlock(); @@ -979,7 +981,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() ); } catch ( Exception e ) { - ActiveMQJournalLogger.LOGGER.error( e.getMessage(), e ); + logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e ); setErrorCondition( tx, e ); } finally { journalLock.readLock().unlock(); @@ -1016,7 +1018,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.addNegative(usedFile, id); } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + logger.error("appendDeleteRecordTransactional:" + e, e); setErrorCondition(tx, e); } finally { journalLock.readLock().unlock(); @@ -1069,7 +1071,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (result != null) { result.fail(e); } - ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + logger.error("appendPrepareRecord:" + e, e); setErrorCondition(tx, e); } finally { journalLock.readLock().unlock(); @@ -1142,7 +1144,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (result != null) { result.fail(e); } - ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + logger.error("appendCommitRecord:" + e, e); setErrorCondition(tx, e); } finally { journalLock.readLock().unlock(); @@ -1185,7 +1187,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (result != null) { result.fail(e); } - ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + logger.error("appendRollbackRecord:" + e, e); setErrorCondition(tx, e); } finally { journalLock.readLock().unlock(); @@ -2067,7 +2069,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void flush() throws Exception { fileFactory.flush(); - flushExecutor(appendExecutor); flushExecutor(filesExecutor); @@ -2081,16 +2082,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // Send something to the closingExecutor, just to make sure we went until its end final CountDownLatch latch = new CountDownLatch(1); - executor.execute(new Runnable() { + try { + executor.execute(new Runnable() { - @Override - public void run() { - latch.countDown(); - } + @Override + public void run() { + latch.countDown(); + } - }); - latch.await(10, TimeUnit.SECONDS); + }); + latch.await(10, TimeUnit.SECONDS); + } catch (RejectedExecutionException ignored ) { + // this is fine + } } + } @Override @@ -2243,7 +2249,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal @Override public synchronized void stop() throws Exception { if (state == JournalState.STOPPED) { - throw new IllegalStateException("Journal is already stopped"); + return; } setJournalState(JournalState.STOPPED); @@ -2905,6 +2911,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal try { scheduleCompactAndBlock(60); } catch (Exception e) { + e.printStackTrace(); throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 768be45..ecaa86e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -146,6 +146,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager { protected BatchingIDGenerator idGenerator; + protected final ExecutorFactory ioExecutors; + protected final ScheduledExecutorService scheduledExecutorService; protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true); @@ -186,18 +188,22 @@ public abstract class AbstractJournalStorageManager implements StorageManager { public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, - final ScheduledExecutorService scheduledExecutorService) { - this(config, executorFactory, scheduledExecutorService, null); + final ScheduledExecutorService scheduledExecutorService, + final ExecutorFactory ioExecutors) { + this(config, executorFactory, scheduledExecutorService, ioExecutors, null); } public AbstractJournalStorageManager(Configuration config, ExecutorFactory executorFactory, ScheduledExecutorService scheduledExecutorService, + ExecutorFactory ioExecutors, IOCriticalErrorListener criticalErrorListener) { this.executorFactory = executorFactory; this.ioCriticalErrorListener = criticalErrorListener; + this.ioExecutors = ioExecutors; + this.scheduledExecutorService = scheduledExecutorService; this.config = config; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index d97f988..e4d401b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -36,15 +36,17 @@ public class JDBCJournalStorageManager extends JournalStorageManager { public JDBCJournalStorageManager(Configuration config, ExecutorFactory executorFactory, + ExecutorFactory ioExecutorFactory, ScheduledExecutorService scheduledExecutorService) { - super(config, executorFactory, scheduledExecutorService); + super(config, executorFactory, scheduledExecutorService, ioExecutorFactory); } public JDBCJournalStorageManager(final Configuration config, final ScheduledExecutorService scheduledExecutorService, final ExecutorFactory executorFactory, + final ExecutorFactory ioExecutorFactory, final IOCriticalErrorListener criticalErrorListener) { - super(config, executorFactory, scheduledExecutorService, criticalErrorListener); + super(config, executorFactory, scheduledExecutorService, ioExecutorFactory, criticalErrorListener); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 2d8411a..24650e1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -86,25 +86,28 @@ public class JournalStorageManager extends AbstractJournalStorageManager { public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, - final ScheduledExecutorService scheduledExecutorService) { - this(config, executorFactory, scheduledExecutorService, null); + final ScheduledExecutorService scheduledExecutorService, + final ExecutorFactory ioExecutors) { + this(config, executorFactory, scheduledExecutorService, ioExecutors, null); } - public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) { - this(config, executorFactory, null, null); + public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ExecutorFactory ioExecutors) { + this(config, executorFactory, null, ioExecutors, null); } public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService, + final ExecutorFactory ioExecutors, final IOCriticalErrorListener criticalErrorListener) { - super(config, executorFactory, scheduledExecutorService, criticalErrorListener); + super(config, executorFactory, scheduledExecutorService, ioExecutors, criticalErrorListener); } public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, + final ExecutorFactory ioExecutors, final IOCriticalErrorListener criticalErrorListener) { - super(config, executorFactory, null, criticalErrorListener); + super(config, executorFactory, null, ioExecutors, criticalErrorListener); } @Override @@ -116,7 +119,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO()); - Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1); + Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0); bindingsJournal = localBindings; originalBindingsJournal = localBindings; @@ -132,7 +135,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); } - Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO()); + Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0); + messageJournal = localMessage; originalMessageJournal = localMessage; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 477f839..a43fec8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -345,6 +345,8 @@ public interface ActiveMQServer extends ActiveMQComponent { ExecutorFactory getExecutorFactory(); + ExecutorFactory getIOExecutorFactory(); + void setGroupingHandler(GroupingHandler groupingHandler); GroupingHandler getGroupingHandler(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 6288bdf..d2de964 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -232,8 +232,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { private volatile ExecutorFactory executorFactory; - private volatile ExecutorService ioExecutorPool; + /** * This is a thread pool for io tasks only. * We can't use the same global executor to avoid starvations. @@ -1637,6 +1637,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public ExecutorFactory getIOExecutorFactory() { + return ioExecutorFactory; + } + + @Override public void setGroupingHandler(final GroupingHandler groupingHandler) { if (this.groupingHandler != null && managementService != null) { // Removing old groupNotification @@ -1770,10 +1775,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { private StorageManager createStorageManager() { if (configuration.isPersistenceEnabled()) { if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { - return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, shutdownOnCriticalIO); + return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO); } else { // Default to File Based Storage Manager, (Legacy default configuration). - return new JournalStorageManager(configuration, executorFactory, scheduledPool, shutdownOnCriticalIO); + return new JournalStorageManager(configuration, executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO); } } return new NullStorageManager(); @@ -1847,6 +1852,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { }); this.ioExecutorPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory); + this.ioExecutorFactory = new OrderedExecutorFactory(ioExecutorPool); } /* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry. If so we use this http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java index 519ffb5..42c48f3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java @@ -1623,11 +1623,15 @@ public class NIOJournalCompactTest extends JournalImplTestBase { final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); + final ExecutorService ioexecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); + OrderedExecutorFactory factory = new OrderedExecutorFactory(executor); + OrderedExecutorFactory iofactory = new OrderedExecutorFactory(ioexecutor); + final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); - final JournalStorageManager storage = new JournalStorageManager(config, factory); + final JournalStorageManager storage = new JournalStorageManager(config, factory, iofactory); storage.start(); @@ -1681,7 +1685,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase { for (long messageID : values) { storage.deleteMessage(messageID); } - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); errors.incrementAndGet(); } @@ -1733,11 +1737,17 @@ public class NIOJournalCompactTest extends JournalImplTestBase { deleteExecutor.shutdown(); - assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS)); + assertTrue("delete executor failted to terminate", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS)); + + storage.stop(); executor.shutdown(); - assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue("executor failed to terminate", executor.awaitTermination(30, TimeUnit.SECONDS)); + + ioexecutor.shutdown(); + + assertTrue("ioexecutor failed to terminate", ioexecutor.awaitTermination(30, TimeUnit.SECONDS)); } catch (Throwable e) { e.printStackTrace(); @@ -1751,6 +1761,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase { executor.shutdownNow(); deleteExecutor.shutdownNow(); + ioexecutor.shutdownNow(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java index 9848c39..7d515d8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java @@ -91,7 +91,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase { @Override protected JournalStorageManager createJournalStorageManager(Configuration configuration) { - return new JournalStorageManager(configuration, execFactory) { + return new JournalStorageManager(configuration, execFactory, execFactory) { @Override public void deleteMessage(final long messageID) throws Exception { deletedMessage.add(messageID); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java index 5828baf..49d3a12 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java @@ -65,7 +65,7 @@ public class RestartSMTest extends ActiveMQTestBase { PostOffice postOffice = new FakePostOffice(); - final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory); + final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory, execFactory); try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java index 814bf0d..a104363 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java @@ -137,7 +137,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { * @param configuration */ protected JournalStorageManager createJournalStorageManager(Configuration configuration) { - JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory); + JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory, execFactory); addActiveMQComponent(jsm); return jsm; } @@ -146,7 +146,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { * @param configuration */ protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) { - JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, scheduledExecutorService); + JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, execFactory, scheduledExecutorService); addActiveMQComponent(jsm); return jsm; } @@ -155,7 +155,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { * @throws Exception */ protected void createJMSStorage() throws Exception { - jmsJournal = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null); + jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null); addActiveMQComponent(jmsJournal); jmsJournal.start(); jmsJournal.load(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 7d2d514..1ae9527 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -435,7 +435,7 @@ public final class ReplicationTest extends ActiveMQTestBase { * @throws Exception */ private JournalStorageManager getStorage() throws Exception { - return new JournalStorageManager(createDefaultInVMConfig(), factory); + return new JournalStorageManager(createDefaultInVMConfig(), factory, factory); } /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java index 65cd6b9..1deb1bb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java @@ -44,6 +44,7 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase { public void setup() throws Exception { serviceRegistry = new ServiceRegistryImpl(); serviceRegistry.setExecutorService(Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory())); + serviceRegistry.setIOExecutorService(Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory())); serviceRegistry.setScheduledExecutorService(Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory())); server = new ActiveMQServerImpl(null, null, null, null, serviceRegistry); server.start(); @@ -58,6 +59,7 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase { } serviceRegistry.getExecutorService().shutdown(); serviceRegistry.getScheduledExecutorService().shutdown(); + serviceRegistry.getIOExecutorService().shutdown(); super.tearDown(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java index 2b24296..be6e5b3 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java @@ -943,8 +943,6 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.checkReclaimStatus(); journalImpl.flush(); - Assert.assertEquals(2, factory.listFiles("tt").size()); - } @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index eb815ae..3be030d 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -62,12 +62,6 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { } catch (IllegalStateException e) { // OK } - try { - stopJournal(); - Assert.fail("Should throw exception"); - } catch (IllegalStateException e) { - // OK - } startJournal(); try { startJournal(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java index fcd32c5..96fa35c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java @@ -70,7 +70,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { @Before public void setUp() throws Exception { super.setUp(); - executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); + executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory()); factory = new OrderedExecutorFactory(executor); } @@ -92,7 +92,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()); - journal = new JournalStorageManager(configuration, factory); + journal = new JournalStorageManager(configuration, factory, factory); journal.start(); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); @@ -112,7 +112,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { journal.stop(); - journal = new JournalStorageManager(configuration, factory); + journal = new JournalStorageManager(configuration, factory, factory); journal.start(); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); @@ -135,7 +135,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { mapDups.clear(); - journal = new JournalStorageManager(configuration, factory); + journal = new JournalStorageManager(configuration, factory, factory); journal.start(); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); @@ -146,6 +146,8 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { values = mapDups.get(ADDRESS); Assert.assertEquals(10, values.size()); + + scheduledThreadPool.shutdown(); } finally { if (journal != null) { try {
