http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index d245985..fcdff05 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.server.management.ManagementService; @@ -62,7 +63,6 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; @@ -261,7 +261,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No public void sendReply(final OpenWireConnection connection, final Command command) { - server.getStorageManager().afterCompleteOperations(new IOAsyncTask() + server.getStorageManager().afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index a4e8ea5..b3d926c 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; @@ -357,7 +357,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No public void sendReply(final StompConnection connection, final StompFrame frame) { - server.getStorageManager().afterCompleteOperations(new IOAsyncTask() + server.getStorageManager().afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 8efeabb..c1b767a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfigu import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.config.impl.Validators; -import org.apache.activemq.artemis.core.journal.impl.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index ee3bb86..8e06cde 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -26,7 +26,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeLis * <PRE> * * +--------------+ 1 +----------------+ N +--------------+ N +--------+ 1 +-------------------+ - * | {@link org.apache.activemq.artemis.core.postoffice.PostOffice} |-------> |{@link PagingManager}|-------> |{@link PagingStore} | ------> | {@link org.apache.activemq.artemis.core.paging.impl.Page} | ------> | {@link org.apache.activemq.artemis.core.journal.SequentialFile} | + * | {@link org.apache.activemq.artemis.core.postoffice.PostOffice} |-------> |{@link PagingManager}|-------> |{@link PagingStore} | ------> | {@link org.apache.activemq.artemis.core.paging.impl.Page} | ------> | {@link SequentialFile} | * +--------------+ +----------------+ +--------------+ +--------+ +-------------------+ * | 1 ^ * | | http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 3b74e45..b0129e7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -158,7 +158,7 @@ public interface PagingStore extends ActiveMQComponent /** * Sends the pages with given IDs to the {@link ReplicationManager}. * <p> - * Sending is done here to avoid exposing the internal {@link org.apache.activemq.artemis.core.journal.SequentialFile}s. + * Sending is done here to avoid exposing the internal {@link SequentialFile}s. * * @param replicator * @param pageIds http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java index f83966d..0818034 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.paging; import java.util.List; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 39324da..e98da75 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -32,8 +32,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -521,7 +521,7 @@ final class PageSubscriptionImpl implements PageSubscription store.storeCursorAcknowledge(cursorId, position); } - store.afterCompleteOperations(new IOAsyncTask() + store.afterCompleteOperations(new IOCallback() { volatile String error = ""; @@ -541,7 +541,7 @@ final class PageSubscriptionImpl implements PageSubscription @Override public String toString() { - return IOAsyncTask.class.getSimpleName() + "(" + PageSubscriptionImpl.class.getSimpleName() + ") " + error; + return IOCallback.class.getSimpleName() + "(" + PageSubscriptionImpl.class.getSimpleName() + ") " + error; } }); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 24d60df..ae41bb0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -25,8 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index 3a12b96..63681f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -29,9 +29,9 @@ import java.util.List; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStoreFactory; @@ -208,6 +208,6 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory private SequentialFileFactory newFileFactory(final String directoryName) { - return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener); + return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 3b0a139..6b67f3e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -37,8 +37,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -576,7 +576,7 @@ public class PagingStoreImpl implements PagingStore public boolean checkPageFileExists(final int pageNumber) { String fileName = createFileName(pageNumber); - SequentialFile file = fileFactory.createSequentialFile(fileName, 1); + SequentialFile file = fileFactory.createSequentialFile(fileName); return file.exists(); } @@ -589,7 +589,7 @@ public class PagingStoreImpl implements PagingStore fileFactory = storeFactory.newFileFactory(getStoreName()); } - SequentialFile file = fileFactory.createSequentialFile(fileName, 1000); + SequentialFile file = fileFactory.createSequentialFile(fileName); Page page = new Page(storeName, storageManager, fileFactory, file, pageNumber); @@ -1227,7 +1227,7 @@ public class PagingStoreImpl implements PagingStore { for (Integer id : pageIds) { - SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id), 1); + SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id)); if (!sFile.exists()) { continue; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java index 2e1dd73..d13f311 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java @@ -16,7 +16,7 @@ */ package org.apache.activemq.artemis.core.persistence; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.IOCompletion; /** @@ -28,7 +28,7 @@ public interface OperationContext extends IOCompletion { /** Execute the task when all IO operations are complete, * Or execute it immediately if nothing is pending. */ - void executeOnCompletion(IOAsyncTask runnable); + void executeOnCompletion(IOCallback runnable); void replicationLineUp(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 5196ccc..7cf8112 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -25,10 +25,10 @@ import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -99,7 +99,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent void pageWrite(PagedMessage message, int pageNumber); - void afterCompleteOperations(IOAsyncTask run); + void afterCompleteOperations(IOCallback run); /** * Block until the operations are done. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 3c25164..210877c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -31,12 +31,12 @@ import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl; import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator.IDCounterEncoding; @@ -109,7 +109,7 @@ public final class DescribeJournal public static void describeBindingsJournal(final File bindingsDir) throws Exception { - SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null); + SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null, 1); JournalImpl bindings = new JournalImpl(1024 * 1024, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1); describeJournal(bindingsFF, bindings, bindingsDir); @@ -118,7 +118,7 @@ public final class DescribeJournal public static DescribeJournal describeMessagesJournal(final File messagesDir) throws Exception { - SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null); + SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null, 1); // Will use only default values. The load function should adapt to anything different ConfigurationImpl defaultValues = new ConfigurationImpl(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 5104b68..33a1cbe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -53,22 +53,22 @@ import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; -import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; -import org.apache.activemq.artemis.core.journal.impl.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -236,7 +236,9 @@ public class JournalStorageManager implements StorageManager } - SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener); + SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), + criticalErrorListener, + config.getJournalMaxIO_NIO()); Journal localBindings = new JournalImpl(1024 * 1024, 2, @@ -261,6 +263,7 @@ public class JournalStorageManager implements StorageManager journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), + config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener); } @@ -271,6 +274,7 @@ public class JournalStorageManager implements StorageManager true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), + config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener); } @@ -296,7 +300,8 @@ public class JournalStorageManager implements StorageManager largeMessagesDirectory = config.getLargeMessagesDirectory(); - largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesLocation(), false, criticalErrorListener); + largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesLocation(), false, criticalErrorListener, + 1); perfBlastPages = config.getJournalPerfBlastPages(); @@ -572,7 +577,7 @@ public class JournalStorageManager implements StorageManager String fileName = entry.getValue().getA(); final long id = entry.getKey(); long size = entry.getValue().getB(); - SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1); + SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName); if (!seqFile.exists()) continue; replicator.syncLargeMessageFile(seqFile, size, id); @@ -608,7 +613,7 @@ public class JournalStorageManager implements StorageManager if (!largeMessagesToDelete.contains(id)) { idList.add(id); - SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename, 1); + SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename); long size = seqFile.size(); largeMessages.put(id, new Pair<String, Long>(filename, size)); } @@ -747,7 +752,7 @@ public class JournalStorageManager implements StorageManager return new OperationContextImpl(executor1); } - public void afterCompleteOperations(final IOAsyncTask run) + public void afterCompleteOperations(final IOCallback run) { getContext().executeOnCompletion(run); } @@ -2498,7 +2503,7 @@ public class JournalStorageManager implements StorageManager public SequentialFile createFileForLargeMessage(final long messageID, LargeMessageExtension extension) { - return largeMessagesFactory.createSequentialFile(messageID + extension.getExtension(), -1); + return largeMessagesFactory.createSequentialFile(messageID + extension.getExtension()); } @@ -2788,7 +2793,7 @@ public class JournalStorageManager implements StorageManager List<String> tmpFiles = largeMessagesFactory.listFiles("tmp"); for (String tmpFile : tmpFiles) { - SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1); + SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile); file.delete(); } } @@ -2830,7 +2835,7 @@ public class JournalStorageManager implements StorageManager return DummyOperationContext.instance; } - public void executeOnCompletion(final IOAsyncTask runnable) + public void executeOnCompletion(final IOCallback runnable) { // There are no executeOnCompletion calls while using the DummyOperationContext // However we keep the code here for correctness http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index dc09160..c4aa058 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -24,7 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.message.BodyEncoder; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java index 76a8653..b034660 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java @@ -20,7 +20,7 @@ import java.nio.ByteBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager.LargeMessageExtension; import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; @@ -62,7 +62,7 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage buffer.rewind(); int bytesRead = appendFile.read(buffer); if (bytesRead > 0) - mainSeqFile.writeInternal(buffer); + mainSeqFile.writeDirect(buffer, false); if (bytesRead < buffer.capacity()) { break; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index 32eca26..a8e8e2a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -134,7 +134,7 @@ public class OperationContextImpl implements OperationContext checkTasks(); } - public void executeOnCompletion(final IOAsyncTask completion) + public void executeOnCompletion(final IOCallback completion) { if (errorCode != -1) { @@ -219,7 +219,7 @@ public class OperationContextImpl implements OperationContext /** * @param task */ - private void execute(final IOAsyncTask task) + private void execute(final IOCallback task) { executorsPending.incrementAndGet(); try @@ -243,7 +243,7 @@ public class OperationContextImpl implements OperationContext } catch (Throwable e) { - ActiveMQServerLogger.LOGGER.errorExecutingIOAsyncTask(e); + ActiveMQServerLogger.LOGGER.errorExecutingAIOCallback(e); executorsPending.decrementAndGet(); task.onError(ActiveMQExceptionType.INTERNAL_ERROR.getCode(), "It wasn't possible to complete IO operation - " + e.getMessage()); @@ -296,9 +296,9 @@ public class OperationContextImpl implements OperationContext final int replicationLined; final int pageLined; - final IOAsyncTask task; + final IOCallback task; - TaskHolder(final IOAsyncTask task) + TaskHolder(final IOCallback task) { storeLined = storeLineUp.intValue(); replicationLined = replicationLineUp.intValue(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 303e257..25cd0fc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.core.persistence.impl.nullpm; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index ade2173..d6336d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -27,10 +27,10 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -112,7 +112,7 @@ public class NullStorageManager implements StorageManager } @Override - public void executeOnCompletion(final IOAsyncTask runnable) + public void executeOnCompletion(final IOCallback runnable) { runnable.done(); } @@ -359,7 +359,7 @@ public class NullStorageManager implements StorageManager } @Override - public void afterCompleteOperations(final IOAsyncTask run) + public void afterCompleteOperations(final IOCallback run) { run.done(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 12bf669..71a2458 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -41,8 +41,8 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.NotificationType; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -1179,7 +1179,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding { // This will use the same thread if there are no pending operations // avoiding a context switch on this case - storageManager.afterCompleteOperations(new IOAsyncTask() + storageManager.afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) { @@ -1249,7 +1249,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding queues.addAll(durableQueues); queues.addAll(nonDurableQueues); - storageManager.afterCompleteOperations(new IOAsyncTask() + storageManager.afterCompleteOperations(new IOCallback() { public void onError(int errorCode, String errorMessage) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 109d4fd..a39950e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -58,8 +58,8 @@ import javax.transaction.xa.Xid; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; @@ -614,7 +614,7 @@ public class ServerSessionPacketHandler implements ChannelHandler final boolean flush, final boolean closeChannel) { - storageManager.afterCompleteOperations(new IOAsyncTask() + storageManager.afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index 2723198..d96dbe1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -26,7 +26,7 @@ import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index ec16825..afb1e77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -34,11 +34,11 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal.JournalState; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.impl.FileWrapperJournal; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -546,7 +546,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon if (!channel1.isOpen()) { - channel1.open(1, false); + channel1.open(); } channel1.writeDirect(ByteBuffer.wrap(data), true); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index a2f39c9..905d7a5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.persistence.OperationContext; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index b136990..11e6f61 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -43,10 +43,10 @@ import io.netty.channel.Channel; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -522,8 +522,8 @@ public interface ActiveMQServerLogger extends BasicLogger void lareMessageErrorCopying(@Cause Exception e, LargeServerMessage largeServerMessage); @LogMessage(level = Logger.Level.WARN) - @Message(id = 222054, value = "Error on executing IOAsyncTask", format = Message.Format.MESSAGE_FORMAT) - void errorExecutingIOAsyncTask(@Cause Throwable t); + @Message(id = 222054, value = "Error on executing IOCallback", format = Message.Format.MESSAGE_FORMAT) + void errorExecutingAIOCallback(@Cause Throwable t); @LogMessage(level = Logger.Level.WARN) @Message(id = 222055, value = "Error on deleting duplicate cache", format = Message.Format.MESSAGE_FORMAT) @@ -1200,7 +1200,7 @@ public interface ActiveMQServerLogger extends BasicLogger @LogMessage(level = Logger.Level.ERROR) @Message(id = 224007, value = "page subscription = {0} error={1}", format = Message.Format.MESSAGE_FORMAT) - void pageSubscriptionError(IOAsyncTask ioAsyncTask, String error); + void pageSubscriptionError(IOCallback IOCallback, String error); @LogMessage(level = Logger.Level.ERROR) @Message(id = 224008, value = "Failed to store id", format = Message.Format.MESSAGE_FORMAT) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index 57120ff..e22a172 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.core.server; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 3c0ba41..106158c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -22,8 +22,8 @@ import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.Consumer; @@ -247,7 +247,7 @@ public class Redistributor implements Consumer tx.commit(); - storageManager.afterCompleteOperations(new IOAsyncTask() + storageManager.afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AIOFileLockNodeManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AIOFileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AIOFileLockNodeManager.java index 7adee2e..2d0f3c0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AIOFileLockNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AIOFileLockNodeManager.java @@ -17,10 +17,11 @@ package org.apache.activemq.artemis.core.server.impl; import java.io.File; -import java.io.IOException; import java.nio.channels.FileLock; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; +import org.apache.activemq.artemis.core.io.aio.ActiveMQFileLock; +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jlibaio.LibaioFile; /** * This is using the ActiveMQ Artemis Libaio Native to perform calls to flock on a Linux system. At the @@ -56,19 +57,15 @@ public final class AIOFileLockNodeManager extends FileLockNodeManager { File file = newFileForRegionLock(lockPos); - int handle = AsynchronousFileImpl.openFile(file.getAbsolutePath()); + LibaioFile fileControl = LibaioContext.openControlFile(file.getAbsolutePath(), false); - if (handle < 0) + if (!fileControl.lock()) { - throw new IOException("couldn't open file " + file.getAbsolutePath()); + fileControl.close(); + return null; } - FileLock lock = AsynchronousFileImpl.lock(handle); - - if (lock == null) - { - AsynchronousFileImpl.closeFile(handle); - } + FileLock lock = new ActiveMQFileLock(fileControl); return lock; @@ -83,21 +80,13 @@ public final class AIOFileLockNodeManager extends FileLockNodeManager while (!interrupted) { - int handle = AsynchronousFileImpl.openFile(file.getAbsolutePath()); - - if (handle < 0) - { - throw new IOException("couldn't open file " + file.getAbsolutePath()); - } - - FileLock lockFile = AsynchronousFileImpl.lock(handle); + FileLock lockFile = tryLock(liveLockPos); if (lockFile != null) { return lockFile; } else { - AsynchronousFileImpl.closeFile(handle); try { Thread.sleep(500); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 0dc85de..9d7e95b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; @@ -29,11 +28,10 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; -import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.impl.AIOSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.impl.SyncSpeedTest; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -96,6 +94,7 @@ import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl; import org.apache.activemq.artemis.core.version.Version; +import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -359,7 +358,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { manager = new InVMNodeManager(replicatingBackup); } - else if (configuration.getJournalType() == JournalType.ASYNCIO && AsynchronousFileImpl.isLoaded()) + else if (configuration.getJournalType() == JournalType.ASYNCIO && LibaioContext.isLoaded()) { manager = new AIOFileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout()); } @@ -401,13 +400,6 @@ public class ActiveMQServerImpl implements ActiveMQServer ActiveMQServerLogger.LOGGER.serverStarting((haPolicy.isBackup() ? "backup" : "live"), configuration); - if (configuration.isRunSyncSpeedTest()) - { - SyncSpeedTest test = new SyncSpeedTest(); - - test.run(); - } - final boolean wasLive = !haPolicy.isBackup(); if (!haPolicy.isBackup()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 29a4509..92cdbf9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -45,8 +45,8 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; @@ -2524,7 +2524,7 @@ public class QueueImpl implements Queue acknowledge(tx, ref); - storageManager.afterCompleteOperations(new IOAsyncTask() + storageManager.afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 98b6bb7..49a3926 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -39,11 +39,11 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -1395,7 +1395,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener public void close(final boolean failed) { if (closed) return; - context.executeOnCompletion(new IOAsyncTask() + context.executeOnCompletion(new IOCallback() { public void onError(int errorCode, String errorMessage) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index 42aab20..357ce6f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -23,7 +23,7 @@ import java.util.Date; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; @@ -197,7 +197,7 @@ public class TransactionImpl implements Transaction // We use the Callback even for non persistence // If we are using non-persistence with replication, the replication manager will have // to execute this runnable in the correct order - storageManager.afterCompleteOperations(new IOAsyncTask() + storageManager.afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) @@ -266,7 +266,7 @@ public class TransactionImpl implements Transaction // to execute this runnable in the correct order // This also will only use a different thread if there are any IO pending. // If the IO finished early by the time we got here, we won't need an executor - storageManager.afterCompleteOperations(new IOAsyncTask() + storageManager.afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) @@ -323,7 +323,7 @@ public class TransactionImpl implements Transaction // We use the Callback even for non persistence // If we are using non-persistence with replication, the replication manager will have // to execute this runnable in the correct order - storageManager.afterCompleteOperations(new IOAsyncTask() + storageManager.afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 26e9bb5..4142f1f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -67,7 +67,6 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; @@ -78,11 +77,11 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -117,6 +116,7 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; +import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; @@ -574,7 +574,7 @@ public abstract class ActiveMQTestBase extends Assert public static JournalType getDefaultJournalType() { - if (AsynchronousFileImpl.isLoaded()) + if (LibaioContext.isLoaded()) { return JournalType.ASYNCIO; } @@ -1894,7 +1894,7 @@ public abstract class ActiveMQTestBase extends Assert JournalImpl messagesJournal = null; try { - SequentialFileFactory messagesFF = new NIOSequentialFileFactory(new File(getJournalDir()), null); + SequentialFileFactory messagesFF = new NIOSequentialFileFactory(new File(getJournalDir()), null, 1); messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), @@ -1940,7 +1940,7 @@ public abstract class ActiveMQTestBase extends Assert protected HashMap<Integer, AtomicInteger> countJournal(Configuration config) throws Exception { final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>(); - SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getJournalLocation(), null); + SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getJournalLocation(), null, 1); JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), @@ -1988,7 +1988,7 @@ public abstract class ActiveMQTestBase extends Assert if (messageJournal) { - ff = new NIOSequentialFileFactory(config.getJournalLocation(), null); + ff = new NIOSequentialFileFactory(config.getJournalLocation(), null, 1); journal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, @@ -2000,7 +2000,7 @@ public abstract class ActiveMQTestBase extends Assert } else { - ff = new NIOSequentialFileFactory(config.getBindingsLocation(), null); + ff = new NIOSequentialFileFactory(config.getBindingsLocation(), null, 1); journal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), @@ -2403,31 +2403,31 @@ public abstract class ActiveMQTestBase extends Assert long timeout = System.currentTimeMillis() + 15000; - while (AsynchronousFileImpl.getTotalMaxIO() != 0 && System.currentTimeMillis() > timeout) - { - try - { - Thread.sleep(100); - } - catch (Exception ignored) - { - } - } - - int invmSize = InVMRegistry.instance.size(); - if (invmSize > 0) - { - InVMRegistry.instance.clear(); - log.info(threadDump("Thread dump")); - fail("invm registry still had acceptors registered"); - } - - final int totalMaxIO = AsynchronousFileImpl.getTotalMaxIO(); - if (totalMaxIO != 0) - { - AsynchronousFileImpl.resetMaxAIO(); - Assert.fail("test did not close all its files " + totalMaxIO); - } +// while (AsynchronousFileImpl.getTotalMaxIO() != 0 && System.currentTimeMillis() > timeout) +// { +// try +// { +// Thread.sleep(100); +// } +// catch (Exception ignored) +// { +// } +// } +// +// int invmSize = InVMRegistry.instance.size(); +// if (invmSize > 0) +// { +// InVMRegistry.instance.clear(); +// log.info(threadDump("Thread dump")); +// fail("invm registry still had acceptors registered"); +// } +// +// final int totalMaxIO = AsynchronousFileImpl.getTotalMaxIO(); +// if (totalMaxIO != 0) +// { +// AsynchronousFileImpl.resetMaxAIO(); +// Assert.fail("test did not close all its files " + totalMaxIO); +// } } private void cleanupPools() http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ColocatedActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ColocatedActiveMQServer.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ColocatedActiveMQServer.java index 4f9dd48..845c9da 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ColocatedActiveMQServer.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ColocatedActiveMQServer.java @@ -17,18 +17,17 @@ package org.apache.activemq.artemis.tests.util; import javax.management.MBeanServer; - import java.io.File; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.impl.AIOFileLockNodeManager; -import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; +import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; @@ -69,7 +68,7 @@ public class ColocatedActiveMQServer extends ActiveMQServerImpl if (replicatingBackup) { NodeManager manager; - if (getConfiguration().getJournalType() == JournalType.ASYNCIO && AsynchronousFileImpl.isLoaded()) + if (getConfiguration().getJournalType() == JournalType.ASYNCIO && LibaioContext.isLoaded()) { return new AIOFileLockNodeManager(directory, replicatingBackup, getConfiguration().getJournalLockAcquisitionTimeout()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index a6bce80..4332fbd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -37,9 +37,9 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -480,7 +480,7 @@ public class HangConsumerTest extends ActiveMQTestBase server.stop(); - SequentialFileFactory messagesFF = new NIOSequentialFileFactory(server.getConfiguration().getBindingsLocation(), null); + SequentialFileFactory messagesFF = new NIOSequentialFileFactory(server.getConfiguration().getBindingsLocation(), null, 1); JournalImpl messagesJournal = new JournalImpl(1024 * 1024, 2, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java index e71189f..cbc2904 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JournalCrashTest.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; @@ -221,7 +221,7 @@ public class JournalCrashTest extends ActiveMQTestBase */ private void printJournal() throws Exception { - NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getJournalDir())); + NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getJournalDir()), 100); JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 0, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LibaioDependencyCheckTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LibaioDependencyCheckTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LibaioDependencyCheckTest.java index 4e6191e..9044b28 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LibaioDependencyCheckTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LibaioDependencyCheckTest.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.junit.Test; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; /** @@ -45,7 +45,7 @@ public class LibaioDependencyCheckTest extends ActiveMQTestBase { if (System.getProperties().get("os.name").equals("Linux")) { - assertTrue("Libaio is not available on this platform", AsynchronousFileImpl.isLoaded()); + assertTrue("Libaio is not available on this platform", LibaioContext.isLoaded()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java index 9d0d4d7..b870639 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java @@ -46,15 +46,15 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -1734,7 +1734,7 @@ public class PagingTest extends ActiveMQTestBase 2, 0, 0, - new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation()), + new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1); @@ -6454,7 +6454,7 @@ public class PagingTest extends ActiveMQTestBase pageDone.countDown(); } - public void executeOnCompletion(IOAsyncTask runnable) + public void executeOnCompletion(IOCallback runnable) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java index 460c4df..dea83e6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java @@ -32,7 +32,7 @@ import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; @@ -297,7 +297,7 @@ public class RedeliveryConsumerTest extends ActiveMQTestBase 2, 0, 0, - new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation()), + new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index ede895f..151e2b4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -45,9 +45,9 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; @@ -1923,7 +1923,7 @@ public class BridgeTest extends ActiveMQTestBase protected Map<Long, AtomicInteger> loadQueues(ActiveMQServer serverToInvestigate) throws Exception { SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration() - .getJournalLocation()); + .getJournalLocation(), 1); JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(), serverToInvestigate.getConfiguration().getJournalMinFiles(),
