ARTEMIS-832 Openwire was ignoring data syncs. I'm also adding the possibility of sync on libaio, and not only relay on write-cache
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bcbbc868 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bcbbc868 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bcbbc868 Branch: refs/heads/ARTEMIS-780 Commit: bcbbc86856cbb9679ce6886852797b3360605730 Parents: 749b831 Author: Clebert Suconic <[email protected]> Authored: Tue Nov 1 21:38:02 2016 -0400 Committer: jbertram <[email protected]> Committed: Thu Nov 3 20:35:15 2016 -0500 ---------------------------------------------------------------------- .../activemq/artemis/cli/commands/Create.java | 8 +- .../cli/commands/util/SyncCalculation.java | 7 +- .../artemis/cli/commands/etc/broker.xml | 2 + .../apache/activemq/cli/test/ArtemisTest.java | 8 +- .../config/ActiveMQDefaultConfiguration.java | 7 + .../client/impl/ClientSessionFactoryImpl.java | 3 +- .../store/file/JDBCSequentialFileFactory.java | 12 + .../artemis/core/io/AbstractSequentialFile.java | 22 -- .../core/io/AbstractSequentialFileFactory.java | 15 ++ .../artemis/core/io/SequentialFileFactory.java | 4 + .../artemis/core/io/aio/AIOSequentialFile.java | 8 +- .../core/io/aio/AIOSequentialFileFactory.java | 2 +- .../core/io/mapped/MappedSequentialFile.java | 19 +- .../io/mapped/MappedSequentialFileFactory.java | 14 +- .../artemis/core/io/nio/NIOSequentialFile.java | 65 +---- artemis-native/bin/libartemis-native-64.so | Bin 25003 -> 28687 bytes ...che_activemq_artemis_jlibaio_LibaioContext.c | 11 +- .../activemq/artemis/jlibaio/LibaioContext.java | 12 +- .../artemis/jlibaio/test/LibaioTest.java | 10 +- .../jlibaio/test/OpenCloseContextTest.java | 8 +- .../amqp/broker/AMQPConnectionCallback.java | 2 +- .../amqp/broker/AMQPSessionCallback.java | 9 +- .../protocol/mqtt/MQTTConnectionManager.java | 3 +- .../protocol/openwire/OpenWireConnection.java | 107 +++++---- .../core/protocol/openwire/amq/AMQSession.java | 4 +- .../protocol/stomp/StompProtocolManager.java | 4 +- .../artemis/core/config/Configuration.java | 17 ++ .../core/config/impl/ConfigurationImpl.java | 13 + .../deployers/impl/FileConfigurationParser.java | 2 + .../impl/journal/JournalStorageManager.java | 4 + .../core/impl/ActiveMQPacketHandler.java | 5 +- .../artemis/core/server/ActiveMQServer.java | 6 +- .../core/server/impl/ActiveMQServerImpl.java | 9 +- .../resources/schema/artemis-configuration.xsd | 8 + .../core/config/impl/FileConfigurationTest.java | 2 + .../artemis/tests/util/ActiveMQTestBase.java | 2 +- .../resources/ConfigurationTest-full-config.xml | 1 + docs/user-manual/en/configuration-index.md | 1 + docs/user-manual/en/persistence.md | 4 + .../integration/persistence/SyncSendTest.java | 235 +++++++++++++++++++ .../vertx/ActiveMQVertxUnitTest.java | 5 +- .../impl/fakes/FakeSequentialFileFactory.java | 10 + 42 files changed, 505 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index ecb9e49..be788cd 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -213,6 +213,9 @@ public class Create extends InputAbstract { @Option(name = "--no-hornetq-acceptor", description = "Disable the HornetQ specific acceptor.") boolean noHornetQAcceptor; + @Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal") + boolean noJournalSync; + boolean IS_WINDOWS; boolean IS_CYGWIN; @@ -567,6 +570,7 @@ public class Create extends InputAbstract { filters.put("${web.protocol}", "http"); filters.put("${extra.web.attributes}", ""); } + filters.put("${fsync}", String.valueOf(!noJournalSync)); filters.put("${user}", System.getProperty("user.name", "")); filters.put("${default.port}", String.valueOf(defaultPort + portOffset)); filters.put("${amqp.port}", String.valueOf(AMQP_PORT + portOffset)); @@ -776,7 +780,7 @@ public class Create extends InputAbstract { System.out.println(""); System.out.println("Auto tuning journal ..."); - long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, aio); + long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, aio); long nanoseconds = SyncCalculation.toNanos(time, writes); double writesPerMillisecond = (double) writes / (double) time; @@ -807,7 +811,7 @@ public class Create extends InputAbstract { // forcing NIO return false; } else if (LibaioContext.isLoaded()) { - try (LibaioContext context = new LibaioContext(1, true)) { + try (LibaioContext context = new LibaioContext(1, true, true)) { File tmpFile = new File(directory, "validateAIO.bin"); boolean supportsLibaio = true; try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java index 468eabf..315ebdc 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java @@ -46,8 +46,9 @@ public class SyncCalculation { int blocks, int tries, boolean verbose, + boolean fsync, boolean aio) throws Exception { - SequentialFileFactory factory = newFactory(datafolder, aio); + SequentialFileFactory factory = newFactory(datafolder, fsync, aio); SequentialFile file = factory.createSequentialFile("test.tmp"); try { @@ -149,9 +150,9 @@ public class SyncCalculation { return timeWait; } - private static SequentialFileFactory newFactory(File datafolder, boolean aio) { + private static SequentialFileFactory newFactory(File datafolder, boolean datasync, boolean aio) { if (aio && LibaioContext.isLoaded()) { - SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1); + SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync); factory.start(); ((AIOSequentialFileFactory) factory).disableBufferReuse(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index fe28246..58c103c 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -45,6 +45,8 @@ under the License. <large-messages-directory>${data.dir}/large-messages</large-messages-directory> + <journal-datasync>${fsync}</journal-datasync> + <journal-min-files>2</journal-min-files> <journal-pool-files>-1</journal-pool-files> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index ba78fb2..2359f1d 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -129,7 +129,7 @@ public class ArtemisTest { public void testSync() throws Exception { int writes = 20; int tries = 10; - long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true); + long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true); System.out.println(); System.out.println("TotalAvg = " + totalAvg); long nanoTime = SyncCalculation.toNanos(totalAvg, writes); @@ -144,7 +144,7 @@ public class ArtemisTest { Run.setEmbedded(true); //instance1: default using http File instance1 = new File(temporaryFolder.getRoot(), "instance1"); - Artemis.main("create", instance1.getAbsolutePath(), "--silent"); + Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync"); File bootstrapFile = new File(new File(instance1, "etc"), "bootstrap.xml"); Assert.assertTrue(bootstrapFile.exists()); Document config = parseXml(bootstrapFile); @@ -163,7 +163,7 @@ public class ArtemisTest { //instance2: https File instance2 = new File(temporaryFolder.getRoot(), "instance2"); - Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1"); + Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--no-fsync"); bootstrapFile = new File(new File(instance2, "etc"), "bootstrap.xml"); Assert.assertTrue(bootstrapFile.exists()); config = parseXml(bootstrapFile); @@ -184,7 +184,7 @@ public class ArtemisTest { //instance3: https with clientAuth File instance3 = new File(temporaryFolder.getRoot(), "instance3"); - Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2"); + Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2", "--no-fsync"); bootstrapFile = new File(new File(instance3, "etc"), "bootstrap.xml"); Assert.assertTrue(bootstrapFile.exists()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 04d06c0..b952430 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -130,6 +130,9 @@ public final class ActiveMQDefaultConfiguration { // true means that the server will use the file based journal for persistence. private static boolean DEFAULT_PERSISTENCE_ENABLED = true; + // true means that the server will sync data files + private static boolean DEFAULT_JOURNAL_DATASYNC = true; + // Maximum number of threads to use for the scheduled thread pool private static int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5; @@ -460,6 +463,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_PERSISTENCE_ENABLED; } + public static boolean isDefaultJournalDatasync() { + return DEFAULT_JOURNAL_DATASYNC; + } + /** * Maximum number of threads to use for the scheduled thread pool */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index d781fff..d2d9886 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -93,7 +93,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private final long connectionTTL; - private final Set<ClientSessionInternal> sessions = new HashSet<>(); + private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<>(); private final Object createSessionLock = new Object(); @@ -506,6 +506,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // this is just a debug, since an interrupt is an expected event (in case of a shutdown) logger.debug(e1.getMessage(), e1); } catch (Throwable t) { + logger.warn(t.getMessage(), t); //for anything else just close so clients are un blocked close(); throw t; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index cafb261..66f00ec 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -61,6 +61,18 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM } @Override + public SequentialFileFactory setDatasync(boolean enabled) { + + // noop + return this; + } + + @Override + public boolean isDatasync() { + return false; + } + + @Override public synchronized void start() { try { if (!started) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java index 0c6dcdf..cd15246 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -59,11 +57,6 @@ public abstract class AbstractSequentialFile implements SequentialFile { protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver(); /** - * Used for asynchronous writes - */ - protected final Executor writerExecutor; - - /** * @param file * @param directory */ @@ -75,7 +68,6 @@ public abstract class AbstractSequentialFile implements SequentialFile { this.file = new File(directory, file); this.directory = directory; this.factory = factory; - this.writerExecutor = writerExecutor; } // Public -------------------------------------------------------- @@ -166,20 +158,6 @@ public abstract class AbstractSequentialFile implements SequentialFile { */ @Override public synchronized void close() throws IOException, InterruptedException, ActiveMQException { - final CountDownLatch donelatch = new CountDownLatch(1); - - if (writerExecutor != null) { - writerExecutor.execute(new Runnable() { - @Override - public void run() { - donelatch.countDown(); - } - }); - - while (!donelatch.await(60, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName()); - } - } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java index 6e61c86..5aa723d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -52,6 +52,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac protected final int maxIO; + protected boolean dataSync = true; + private final IOCriticalErrorListener critialErrorListener; /** @@ -81,6 +83,19 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac this.maxIO = maxIO; } + + @Override + public SequentialFileFactory setDatasync(boolean enabled) { + this.dataSync = enabled; + return this; + } + + @Override + public boolean isDatasync() { + return dataSync; + } + + @Override public void stop() { if (timedBuffer != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java index 81203cf..2229edf 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java @@ -95,4 +95,8 @@ public interface SequentialFileFactory { void createDirs() throws Exception; void flush(); + + SequentialFileFactory setDatasync(boolean enabled); + + boolean isDatasync(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index a0d20d2..874e411 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -97,7 +97,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public SequentialFile cloneFile() { - return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), writerExecutor); + return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), null); } @Override @@ -214,11 +214,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(callback, bytes); runnableCallback.initWrite(positionToWrite, bytesToWrite); - if (writerExecutor != null) { - writerExecutor.execute(runnableCallback); - } else { - runnableCallback.run(); - } + runnableCallback.run(); } AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index da0d079..57d18f5 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -211,7 +211,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor if (running.compareAndSet(false, true)) { super.start(); - this.libaioContext = new LibaioContext(maxIO, true); + this.libaioContext = new LibaioContext(maxIO, true, dataSync); this.running.set(true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java index 522dbd1..017948b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java @@ -49,12 +49,15 @@ final class MappedSequentialFile implements SequentialFile { private String fileName; private MappedFile mappedFile; private ActiveMQBuffer pooledActiveMQBuffer; + private final MappedSequentialFileFactory factory; - MappedSequentialFile(final File directory, + MappedSequentialFile(MappedSequentialFileFactory factory, + final File directory, final File file, final long chunkBytes, final long overlapBytes, final IOCriticalErrorListener criticalErrorListener) { + this.factory = factory; this.directory = directory; this.file = file; this.absoluteFile = null; @@ -155,7 +158,7 @@ final class MappedSequentialFile implements SequentialFile { final int readableBytes = writerIndex - readerIndex; if (readableBytes > 0) { this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -178,7 +181,7 @@ final class MappedSequentialFile implements SequentialFile { final int readableBytes = writerIndex - readerIndex; if (readableBytes > 0) { this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -209,7 +212,7 @@ final class MappedSequentialFile implements SequentialFile { final int readableBytes = writerIndex - readerIndex; if (readableBytes > 0) { this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -235,7 +238,7 @@ final class MappedSequentialFile implements SequentialFile { final int readableBytes = writerIndex - readerIndex; if (readableBytes > 0) { this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -253,7 +256,7 @@ final class MappedSequentialFile implements SequentialFile { final int remaining = limit - position; if (remaining > 0) { this.mappedFile.write(bytes, position, remaining); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -275,7 +278,7 @@ final class MappedSequentialFile implements SequentialFile { final int remaining = limit - position; if (remaining > 0) { this.mappedFile.write(bytes, position, remaining); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -381,7 +384,7 @@ final class MappedSequentialFile implements SequentialFile { @Override public SequentialFile cloneFile() { checkIsNotOpen(); - return new MappedSequentialFile(this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener); + return new MappedSequentialFile(factory, this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index 23af0b6..8ccef74 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -37,6 +37,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory private final IOCriticalErrorListener criticalErrorListener; private long chunkBytes; private long overlapBytes; + private boolean useDataSync; public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) { this.directory = directory; @@ -72,7 +73,18 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public SequentialFile createSequentialFile(String fileName) { - return new MappedSequentialFile(directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener); + return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener); + } + + @Override + public SequentialFileFactory setDatasync(boolean enabled) { + this.useDataSync = enabled; + return this; + } + + @Override + public boolean isDatasync() { + return useDataSync; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 40e0544..2887d25 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -23,8 +23,6 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.Executor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -35,7 +33,6 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; public final class NIOSequentialFile extends AbstractSequentialFile { @@ -43,11 +40,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile { private RandomAccessFile rfile; - /** - * The write semaphore here is only used when writing asynchronously - */ - private Semaphore maxIOSemaphore; - private final int defaultMaxIO; private int maxIO; @@ -99,11 +91,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; } - - if (writerExecutor != null && useExecutor) { - maxIOSemaphore = new Semaphore(maxIO); - this.maxIO = maxIO; - } } @Override @@ -124,6 +111,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; } + channel.force(true); fileSize = channel.size(); } @@ -138,13 +126,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile { public synchronized void close() throws IOException, InterruptedException, ActiveMQException { super.close(); - if (maxIOSemaphore != null) { - while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName()); - } - } - - maxIOSemaphore = null; try { if (channel != null) { channel.close(); @@ -202,7 +183,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { @Override public void sync() throws IOException { - if (channel != null) { + if (factory.isDatasync() && channel != null) { try { channel.force(false); } catch (ClosedChannelException e) { @@ -250,7 +231,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { @Override public SequentialFile cloneFile() { - return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor); + return new NIOSequentialFile(factory, directory, getFileName(), maxIO, null); } @Override @@ -298,40 +279,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile { position.addAndGet(bytes.limit()); - if (maxIOSemaphore == null || callback == null) { - // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous - try { - doInternalWrite(bytes, sync, callback); - } catch (ClosedChannelException e) { - throw e; - } catch (IOException e) { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - } - } else { - // This is a flow control on writing, just like maxAIO on libaio - maxIOSemaphore.acquire(); - - writerExecutor.execute(new Runnable() { - @Override - public void run() { - try { - try { - doInternalWrite(bytes, sync, callback); - } catch (ClosedChannelException e) { - ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); - } catch (IOException e) { - ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this); - callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); - } catch (Throwable e) { - ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); - callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); - } - } finally { - maxIOSemaphore.release(); - } - } - }); + try { + doInternalWrite(bytes, sync, callback); + } catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/bin/libartemis-native-64.so ---------------------------------------------------------------------- diff --git a/artemis-native/bin/libartemis-native-64.so b/artemis-native/bin/libartemis-native-64.so old mode 100644 new mode 100755 index 95a5451..8cbe851 Binary files a/artemis-native/bin/libartemis-native-64.so and b/artemis-native/bin/libartemis-native-64.so differ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c index 74545fc..3f7c213 100644 --- a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c +++ b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c @@ -536,7 +536,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su } JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll - (JNIEnv * env, jobject thisObject, jobject contextPointer) { + (JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) { #ifdef DEBUG fprintf (stdout, "Running blockedPoll\n"); @@ -553,6 +553,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl short running = 1; + int lastFile = -1; + while (running) { int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0); @@ -574,6 +576,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl fflush(stdout); #endif + lastFile = -1; + for (i = 0; i < result; i++) { #ifdef DEBUG @@ -593,6 +597,11 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl break; } + if (useFdatasync && lastFile != iocbp->aio_fildes) { + lastFile = iocbp->aio_fildes; + fdatasync(lastFile); + } + int eventResult = (int)event->res; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java index 8049a97..cdaea55 100644 --- a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java @@ -49,7 +49,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable { * <br> * Or else the native module won't be loaded because of version mismatches */ - private static final int EXPECTED_NATIVE_VERSION = 6; + private static final int EXPECTED_NATIVE_VERSION = 7; private static boolean loaded = false; @@ -146,6 +146,8 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable { final int queueSize; + final boolean useFdatasync; + /** * The queue size here will use resources defined on the kernel parameter * <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> . @@ -153,11 +155,13 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable { * @param queueSize the size to be initialize on libaio * io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr. * @param useSemaphore should block on a semaphore avoiding using more submits than what's available. + * @param useFdatasync should use fdatasync before calling callbacks. */ - public LibaioContext(int queueSize, boolean useSemaphore) { + public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) { try { contexts.incrementAndGet(); this.ioContext = newContext(queueSize); + this.useFdatasync = useFdatasync; } catch (Exception e) { throw e; } @@ -349,7 +353,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable { */ public void poll() { if (!closed.get()) { - blockedPoll(ioContext); + blockedPoll(ioContext, useFdatasync); } } @@ -436,7 +440,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable { /** * This method will block as long as the context is open. */ - native void blockedPoll(ByteBuffer libaioContext); + native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync); static native int getNativeVersion(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java index 7f98f0d..1013966 100644 --- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java +++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java @@ -54,7 +54,7 @@ public class LibaioTest { parent.mkdirs(); boolean failed = false; - try (LibaioContext control = new LibaioContext<>(1, true); LibaioFile fileDescriptor = control.openFile(file, true)) { + try (LibaioContext control = new LibaioContext<>(1, true, true); LibaioFile fileDescriptor = control.openFile(file, true)) { fileDescriptor.fallocate(4 * 1024); } catch (Exception e) { e.printStackTrace(); @@ -80,7 +80,7 @@ public class LibaioTest { @Before public void setUpFactory() { - control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true); + control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, true); } @After @@ -532,10 +532,10 @@ public class LibaioTest { boolean exceptionThrown = false; control.close(); - control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false); + control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false, true); try { // There is no space for a queue this huge, the native layer should throw the exception - LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false); + LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false, true); } catch (RuntimeException e) { exceptionThrown = true; } @@ -630,7 +630,7 @@ public class LibaioTest { @Test public void testBlockedCallback() throws Exception { - final LibaioContext blockedContext = new LibaioContext(500, true); + final LibaioContext blockedContext = new LibaioContext(500, true, true); Thread t = new Thread() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java index c04bff4..b515663 100644 --- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java +++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java @@ -53,7 +53,7 @@ public class OpenCloseContextTest { for (int i = 0; i < 10; i++) { System.out.println("#test " + i); - final LibaioContext control = new LibaioContext<>(5, true); + final LibaioContext control = new LibaioContext<>(5, true, true); Thread t = new Thread() { @Override public void run() { @@ -111,7 +111,7 @@ public class OpenCloseContextTest { for (int i = 0; i < 10; i++) { System.out.println("#test " + i); - final LibaioContext control = new LibaioContext<>(5, true); + final LibaioContext control = new LibaioContext<>(5, true, true); Thread t = new Thread() { @Override public void run() { @@ -164,9 +164,9 @@ public class OpenCloseContextTest { @Test public void testCloseAndStart() throws Exception { - final LibaioContext control2 = new LibaioContext<>(5, true); + final LibaioContext control2 = new LibaioContext<>(5, true, true); - final LibaioContext control = new LibaioContext<>(5, true); + final LibaioContext control = new LibaioContext<>(5, true, true); control.close(); control.poll(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 4ced546..24c625c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -177,7 +177,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { } public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { - return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor); + return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext()); } public void sendSASLSupported() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 66c7b4b..acbb2e9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; @@ -81,6 +82,8 @@ public class AMQPSessionCallback implements SessionCallback { private ServerSession serverSession; + private final OperationContext operationContext; + private AMQPSessionContext protonSession; private final Executor closeExecutor; @@ -91,12 +94,14 @@ public class AMQPSessionCallback implements SessionCallback { ProtonProtocolManager manager, AMQPConnectionContext connection, Connection transportConnection, - Executor executor) { + Executor executor, + OperationContext operationContext) { this.protonSPI = protonSPI; this.manager = manager; this.connection = connection; this.transportConnection = transportConnection; this.closeExecutor = executor; + this.operationContext = operationContext; } @Override @@ -151,7 +156,7 @@ public class AMQPSessionCallback implements SessionCallback { false, // boolean autoCommitAcks, false, // boolean preAcknowledge, true, //boolean xa, - (String) null, this, true); + (String) null, this, true, operationContext); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 3a1f447..ce65648 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -96,7 +96,8 @@ public class MQTTConnectionManager { String id = UUIDGenerator.getInstance().generateStringUUID(); ActiveMQServer server = session.getServer(); - ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE); + ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, + session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext()); return (ServerSessionImpl) serverSession; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 33418e6..8dc0b34 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -119,12 +120,15 @@ import org.apache.activemq.state.SessionState; import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; +import org.jboss.logging.Logger; /** * Represents an activemq connection. */ public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth { + private static final Logger logger = Logger.getLogger(OpenWireConnection.class); + private static final KeepAliveInfo PING = new KeepAliveInfo(); private final OpenWireProtocolManager protocolManager; @@ -139,17 +143,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private final AtomicBoolean stopping = new AtomicBoolean(false); - private boolean inServiceException; - - private final AtomicBoolean asyncException = new AtomicBoolean(false); - - // Clebert: Artemis session has meta-data support, perhaps we could reuse it here private final Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>(); private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new ConcurrentHashMap<>(); private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new ConcurrentHashMap<>(); - // Clebert TODO: Artemis already stores the Session. Why do we need a different one here private final Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>(); private ConnectionState state; @@ -172,6 +170,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se */ private ServerSession internalSession; + private final OperationContext operationContext; + private volatile long lastSent = -1; private ConnectionEntry connectionEntry; private boolean useKeepAlive; @@ -185,6 +185,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se OpenWireFormat wf) { super(connection, executor); this.server = server; + this.operationContext = server.newOperationContext(); this.protocolManager = openWireProtocolManager; this.wireFormat = wf; this.useKeepAlive = openWireProtocolManager.isUseKeepAlive(); @@ -201,6 +202,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return info.getUserName(); } + + public OperationContext getOperationContext() { + return operationContext; + } + // SecurityAuth implementation @Override public RemotingConnection getRemotingConnection() { @@ -239,6 +245,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se super.bufferReceived(connectionID, buffer); try { + recoverOperationContext(); + Command command = (Command) wireFormat.unmarshal(buffer); boolean responseRequired = command.isResponseRequired(); @@ -285,17 +293,38 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - // TODO: response through operation-context - - if (response != null && !protocolManager.isStopping()) { - response.setCorrelationId(commandId); - dispatchSync(response); - } + sendAsyncResponse(commandId, response); } } catch (Exception e) { ActiveMQServerLogger.LOGGER.debug(e); sendException(e); + } finally { + clearupOperationContext(); + } + } + + /** It will send the response through the operation context, as soon as everything is confirmed on disk */ + private void sendAsyncResponse(final int commandId, final Response response) throws Exception { + if (response != null) { + operationContext.executeOnCompletion(new IOCallback() { + @Override + public void done() { + if (!protocolManager.isStopping()) { + try { + response.setCorrelationId(commandId); + dispatchSync(response); + } catch (Exception e) { + sendException(e); + } + } + } + + @Override + public void onError(int errorCode, String errorMessage) { + sendException(new IOException(errorCode + "-" + errorMessage)); + } + }); } } @@ -626,7 +655,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } private void createInternalSession(ConnectionInfo info) throws Exception { - internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true); + internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext); } //raise the refCount of context @@ -1083,7 +1112,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processBeginTransaction(TransactionInfo info) throws Exception { final TransactionId txID = info.getTransactionId(); - setOperationContext(null); try { internalSession.resetTX(null); if (txID.isXATransaction()) { @@ -1101,7 +1129,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } finally { internalSession.resetTX(null); - clearOpeartionContext(); } return null; } @@ -1118,12 +1145,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se AMQSession session = (AMQSession) tx.getProtocolData(); - setOperationContext(session); - try { - tx.commit(onePhase); - } finally { - clearOpeartionContext(); - } + tx.commit(onePhase); return null; } @@ -1137,21 +1159,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processForgetTransaction(TransactionInfo info) throws Exception { TransactionId txID = info.getTransactionId(); - setOperationContext(null); - try { - if (txID.isXATransaction()) { - try { - Xid xid = OpenWireUtil.toXID(info.getTransactionId()); - internalSession.xaForget(xid); - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - } else { - txMap.remove(txID); + if (txID.isXATransaction()) { + try { + Xid xid = OpenWireUtil.toXID(info.getTransactionId()); + internalSession.xaForget(xid); + } catch (Exception e) { + e.printStackTrace(); + throw e; } - } finally { - clearOpeartionContext(); + } else { + txMap.remove(txID); } return null; @@ -1161,7 +1178,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processPrepareTransaction(TransactionInfo info) throws Exception { TransactionId txID = info.getTransactionId(); - setOperationContext(null); try { if (txID.isXATransaction()) { try { @@ -1177,7 +1193,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } finally { internalSession.resetTX(null); - clearOpeartionContext(); } return new IntegerResponse(XAResource.XA_RDONLY); @@ -1187,7 +1202,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processEndTransaction(TransactionInfo info) throws Exception { TransactionId txID = info.getTransactionId(); - setOperationContext(null); if (txID.isXATransaction()) { try { Transaction tx = lookupTX(txID, null); @@ -1204,7 +1218,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } else { txMap.remove(txID); - clearOpeartionContext(); } return null; @@ -1267,13 +1280,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se Transaction tx = lookupTX(messageSend.getTransactionId(), session); - setOperationContext(session); session.getCoreSession().resetTX(tx); try { session.send(producerInfo, messageSend, sendProducerAck); } finally { session.getCoreSession().resetTX(null); - clearOpeartionContext(); } return null; @@ -1283,7 +1294,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processMessageAck(MessageAck ack) throws Exception { AMQSession session = getSession(ack.getConsumerId().getParentId()); Transaction tx = lookupTX(ack.getTransactionId(), session); - setOperationContext(session); session.getCoreSession().resetTX(tx); try { @@ -1291,7 +1301,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se consumerBrokerExchange.acknowledge(ack); } finally { session.getCoreSession().resetTX(null); - clearOpeartionContext(); } return null; } @@ -1367,17 +1376,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } - private void setOperationContext(AMQSession session) { - OperationContext ctx; - if (session == null) { - ctx = this.internalSession.getSessionContext(); - } else { - ctx = session.getCoreSession().getSessionContext(); - } - server.getStorageManager().setContext(ctx); + private void recoverOperationContext() { + server.getStorageManager().setContext(this.operationContext); } - private void clearOpeartionContext() { + private void clearupOperationContext() { server.getStorageManager().clearContext(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 714a29a..426f4e6 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -107,7 +107,7 @@ public class AMQSession implements SessionCallback { // now try { - coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true); + coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext()); long sessionId = sessInfo.getSessionId().getValue(); if (sessionId == -1) { @@ -290,8 +290,6 @@ public class AMQSession implements SessionCallback { } else { final Connection transportConnection = connection.getTransportConnection(); - // new Exception("Setting to false").printStackTrace(); - if (transportConnection == null) { // I don't think this could happen, but just in case, avoiding races runnable = null; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 46f8e4c..6029b37 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -230,7 +230,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St if (stompSession == null) { stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor())); String name = UUIDGenerator.getInstance().generateStringUUID(); - ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true); + ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext()); stompSession.setServerSession(session); sessions.put(connection.getID(), stompSession); } @@ -243,7 +243,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St if (stompSession == null) { stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor)); String name = UUIDGenerator.getInstance().generateStringUUID(); - ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true); + ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext()); stompSession.setServerSession(session); transactedSessions.put(txID, stompSession); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 17a305e..8d47f97 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -79,6 +79,23 @@ public interface Configuration { Configuration setPersistenceEnabled(boolean enable); /** + * Should use fdatasync on journal files. + * + * @see <a href="http://man7.org/linux/man-pages/man2/fdatasync.2.html">fdatasync</a> + * + * @return a boolean + */ + boolean isJournalDatasync(); + + /** + * documented at {@link #isJournalDatasync()} ()} + * + * @param enable + * @return this + */ + Configuration setJournalDatasync(boolean enable); + + /** * @return usernames mapped to ResourceLimitSettings */ Map<String, ResourceLimitSettings> getResourceLimitSettings(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 8ff1922..3b66f83 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -78,6 +78,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled(); + private boolean journalDatasync = ActiveMQDefaultConfiguration.isDefaultJournalDatasync(); + protected long fileDeploymentScanPeriod = ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod(); private boolean persistDeliveryCountBeforeDelivery = ActiveMQDefaultConfiguration.isDefaultPersistDeliveryCountBeforeDelivery(); @@ -301,6 +303,17 @@ public class ConfigurationImpl implements Configuration, Serializable { } @Override + public boolean isJournalDatasync() { + return journalDatasync; + } + + @Override + public ConfigurationImpl setJournalDatasync(boolean enable) { + journalDatasync = enable; + return this; + } + + @Override public long getFileDeployerScanPeriod() { return fileDeploymentScanPeriod; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 2dccb03..a77b850 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -488,6 +488,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { } } + config.setJournalDatasync(getBoolean(e, "journal-datasync", config.isJournalDatasync())); + config.setJournalSyncTransactional(getBoolean(e, "journal-sync-transactional", config.isJournalSyncTransactional())); config.setJournalSyncNonTransactional(getBoolean(e, "journal-sync-non-transactional", config.isJournalSyncNonTransactional())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 24650e1..c0ef93e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -118,6 +118,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO()); + bindingsFF.setDatasync(config.isJournalDatasync()); Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0); @@ -135,6 +136,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); } + journalFF.setDatasync(config.isJournalDatasync()); + + Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0); messageJournal = localMessage; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 149c011..64e496a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; @@ -150,7 +151,9 @@ public class ActiveMQPacketHandler implements ChannelHandler { activeMQPrincipal = connection.getDefaultActiveMQPrincipal(); } - ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true); + OperationContext sessionOperationContext = server.newOperationContext(); + + ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); channel.setHandler(handler); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index a266bff..9b5578c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.remoting.server.RemotingService; @@ -181,7 +182,8 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean xa, String defaultAddress, SessionCallback callback, - boolean autoCreateQueues) throws Exception; + boolean autoCreateQueues, + OperationContext context) throws Exception; SecurityStore getSecurityStore(); @@ -193,6 +195,8 @@ public interface ActiveMQServer extends ActiveMQComponent { HierarchicalRepository<AddressSettings> getAddressSettingsRepository(); + OperationContext newOperationContext(); + int getConnectionCount(); long getTotalConnectionCount(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 9a0293e..8e86067 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -426,6 +426,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public OperationContext newOperationContext() { + return getStorageManager().newContext(getExecutorFactory().getExecutor()); + } + + @Override public final synchronized void start() throws Exception { if (state != SERVER_STATE.STOPPED) { logger.debug("Server already started!"); @@ -1190,7 +1195,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean xa, final String defaultAddress, final SessionCallback callback, - final boolean autoCreateQueues) throws Exception { + final boolean autoCreateQueues, + final OperationContext context) throws Exception { String validatedUser = ""; if (securityStore != null) { @@ -1203,7 +1209,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { checkSessionLimit(validatedUser); - final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor()); final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues); sessions.put(name, session); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 4c3e068..8da84fe 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -46,6 +46,14 @@ </xsd:annotation> </xsd:element> + <xsd:element name="journal-datasync" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + that means the server will use fdatasync to confirm writes on the disk. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="persistence-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 46f3958..c1639c7 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -359,6 +359,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(1234567, conf.getGlobalMaxSize()); assertEquals(37, conf.getMaxDiskUsage()); assertEquals(123, conf.getDiskScanPeriod()); + + assertEquals(false, conf.isJournalDatasync()); } private void verifyAddresses() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 29119f8..7f01767 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -450,7 +450,7 @@ public abstract class ActiveMQTestBase extends Assert { * @throws Exception */ protected ConfigurationImpl createBasicConfig(final int serverID) { - ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD); + ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD).setJournalDatasync(false); return configuration; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/test/resources/ConfigurationTest-full-config.xml ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 3bc14bf..87dbd90 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -49,6 +49,7 @@ <message-expiry-scan-period>10111213</message-expiry-scan-period> <message-expiry-thread-priority>8</message-expiry-thread-priority> <id-cache-size>127</id-cache-size> + <journal-datasync>false</journal-datasync> <persist-id-cache>true</persist-id-cache> <populate-validated-user>true</populate-validated-user> <connection-ttl-check-interval>98765</connection-ttl-check-interval> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/docs/user-manual/en/configuration-index.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index c47861b..65ef931 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -62,6 +62,7 @@ Name | Description [journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true [journal-sync-transactional](persistence.md) | if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true [journal-type](persistence.md) | the type of journal to use. Default=ASYNCIO +[journal-datasync](persistence.md) | It will use fsync on journal operations. Default=true. [large-messages-directory](large-messages.md "Configuring the server") | the directory to store large messages. Default=data/largemessages [management-address](management.md "Configuring Core Management") | the name of the management address to send management messages to. It is prefixed with "jms.queue" so that JMS clients can send messages to it. Default=jms.queue.activemq.management [management-notification-address](management.md "Configuring The Core Management Notification Address") | the name of the address that consumers bind to receive management notifications. Default=activemq.notifications http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/docs/user-manual/en/persistence.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index cee06f4..6f9c481 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -298,6 +298,10 @@ The message journal is configured using the following attributes in data files on the journal The default for this parameter is `30` + +- `journal-datasync` (default: true) + + This will disable the use of fdatasync on journal writes. ### An important note on disabling disk write cache.
