ARTEMIS-163 First pass on the native AIO refactoring https://issues.apache.org/jira/browse/ARTEMIS-163
On this pass I'm just converting the native layer to a simpler one. It wasn't very easy to change the alignment at the current framework, so I did some refactoring simplifying the native layer The volume of the nubmer of changes here is because: - The API is changed, we now don't close the libaio queue between files - The native layer won't use malloc as much as it used to, saving some CPU and memory defragmentation - I organized the code around nio and libaio Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6fe9e0eb Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6fe9e0eb Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6fe9e0eb Branch: refs/heads/master Commit: 6fe9e0ebd6504897ba2b3ce7185cc63a0dbc8feb Parents: 661f695 Author: Clebert Suconic <[email protected]> Authored: Mon Jul 27 16:15:03 2015 -0400 Committer: Clebert Suconic <[email protected]> Committed: Wed Jul 29 22:12:03 2015 -0400 ---------------------------------------------------------------------- .gitignore | 2 +- CMakeLists.txt | 18 + README.md | 2 +- .../activemq/artemis/cli/commands/Create.java | 90 +- .../cli/commands/tools/CompactJournal.java | 6 +- .../cli/commands/tools/DecodeJournal.java | 4 +- .../cli/commands/tools/EncodeJournal.java | 6 +- .../cli/commands/tools/XmlDataExporter.java | 6 +- .../cli/commands/util/SyncCalculation.java | 190 ++++ .../artemis/cli/commands/etc/broker.xml | 1 + .../commands/etc/journal-buffer-settings.txt | 8 + .../apache/activemq/cli/test/ArtemisTest.java | 23 +- .../activemq/cli/test/StreamClassPathTest.java | 1 + .../artemis/api/core/ActiveMQNativeIOError.java | 5 + .../journal/JMSJournalStorageManagerImpl.java | 6 +- .../artemis/core/asyncio/AIOCallback.java | 33 - .../artemis/core/asyncio/AsynchronousFile.java | 58 - .../artemis/core/asyncio/BufferCallback.java | 27 - .../core/asyncio/IOExceptionListener.java | 22 - .../core/asyncio/impl/ActiveMQFileLock.java | 47 - .../core/asyncio/impl/AsynchronousFileImpl.java | 822 -------------- .../artemis/core/io/AbstractSequentialFile.java | 407 +++++++ .../core/io/AbstractSequentialFileFactory.java | 224 ++++ .../activemq/artemis/core/io/DummyCallback.java | 49 + .../activemq/artemis/core/io/IOCallback.java | 33 + .../core/io/IOCriticalErrorListener.java | 25 + .../artemis/core/io/IOExceptionListener.java | 22 + .../artemis/core/io/SequentialFile.java | 116 ++ .../artemis/core/io/SequentialFileFactory.java | 91 ++ .../artemis/core/io/aio/AIOSequentialFile.java | 333 ++++++ .../core/io/aio/AIOSequentialFileFactory.java | 531 +++++++++ .../artemis/core/io/aio/ActiveMQFileLock.java | 47 + .../artemis/core/io/buffer/TimedBuffer.java | 558 ++++++++++ .../core/io/buffer/TimedBufferObserver.java | 53 + .../artemis/core/io/nio/NIOSequentialFile.java | 393 +++++++ .../core/io/nio/NIOSequentialFileFactory.java | 168 +++ .../artemis/core/journal/IOAsyncTask.java | 27 - .../artemis/core/journal/IOCompletion.java | 4 +- .../core/journal/IOCriticalErrorListener.java | 22 - .../activemq/artemis/core/journal/Journal.java | 1 + .../artemis/core/journal/SequentialFile.java | 129 --- .../core/journal/SequentialFileFactory.java | 89 -- .../core/journal/impl/AIOSequentialFile.java | 326 ------ .../journal/impl/AIOSequentialFileFactory.java | 358 ------ .../journal/impl/AbstractJournalUpdateTask.java | 8 +- .../journal/impl/AbstractSequentialFile.java | 407 ------- .../impl/AbstractSequentialFileFactory.java | 218 ---- .../core/journal/impl/DummyCallback.java | 48 - .../core/journal/impl/FileWrapperJournal.java | 2 +- .../artemis/core/journal/impl/JournalBase.java | 1 + .../core/journal/impl/JournalCompactor.java | 6 +- .../artemis/core/journal/impl/JournalFile.java | 2 +- .../core/journal/impl/JournalFileImpl.java | 2 +- .../journal/impl/JournalFilesRepository.java | 8 +- .../artemis/core/journal/impl/JournalImpl.java | 28 +- .../core/journal/impl/NIOSequentialFile.java | 404 ------- .../journal/impl/NIOSequentialFileFactory.java | 168 --- .../core/journal/impl/SyncSpeedTest.java | 354 ------ .../artemis/core/journal/impl/TimedBuffer.java | 558 ---------- .../core/journal/impl/TimedBufferObserver.java | 52 - .../core/journal/impl/TransactionCallback.java | 12 +- .../artemis/core/io/aio/CallbackOrderTest.java | 97 ++ .../artemis/maven/ActiveMQCreatePlugin.java | 1 + artemis-native/bin/libartemis-native-32.so | Bin 44082 -> 22260 bytes artemis-native/bin/libartemis-native-64.so | Bin 46624 -> 23984 bytes artemis-native/pom.xml | 24 + artemis-native/src/main/c/AIOController.cpp | 63 -- artemis-native/src/main/c/AIOController.h | 51 - artemis-native/src/main/c/AIOException.h | 75 -- artemis-native/src/main/c/AsyncFile.cpp | 348 ------ artemis-native/src/main/c/AsyncFile.h | 93 -- artemis-native/src/main/c/CMakeLists.txt | 26 +- artemis-native/src/main/c/CallbackAdapter.h | 42 - artemis-native/src/main/c/JAIODatatypes.h | 28 - .../src/main/c/JNICallbackAdapter.cpp | 62 -- artemis-native/src/main/c/JNICallbackAdapter.h | 66 -- .../src/main/c/JNI_AsynchronousFileImpl.cpp | 377 ------- artemis-native/src/main/c/JavaUtilities.cpp | 62 -- artemis-native/src/main/c/JavaUtilities.h | 26 - artemis-native/src/main/c/LockClass.h | 39 - artemis-native/src/main/c/Version.h | 24 - artemis-native/src/main/c/exception_helper.h | 23 + ...che_activemq_artemis_jlibaio_LibaioContext.c | 710 ++++++++++++ .../activemq/artemis/core/libaio/Native.java | 74 -- .../activemq/artemis/jlibaio/LibaioContext.java | 446 ++++++++ .../activemq/artemis/jlibaio/LibaioFile.java | 152 +++ .../activemq/artemis/jlibaio/NativeLogger.java | 51 + .../activemq/artemis/jlibaio/SubmitInfo.java | 25 + .../activemq/artemis/jlibaio/package-info.java | 24 + .../artemis/jlibaio/util/CallbackCache.java | 93 ++ .../jlibaio/test/CallbackCachelTest.java | 112 ++ .../artemis/jlibaio/test/LibaioTest.java | 859 +++++++++++++++ .../plug/ProtonSessionIntegrationCallback.java | 4 +- .../core/protocol/mqtt/MQTTPublishManager.java | 4 +- .../openwire/OpenWireProtocolManager.java | 4 +- .../protocol/stomp/StompProtocolManager.java | 4 +- .../deployers/impl/FileConfigurationParser.java | 2 +- .../artemis/core/paging/PagingManager.java | 2 +- .../artemis/core/paging/PagingStore.java | 2 +- .../artemis/core/paging/PagingStoreFactory.java | 2 +- .../cursor/impl/PageSubscriptionImpl.java | 6 +- .../activemq/artemis/core/paging/impl/Page.java | 4 +- .../core/paging/impl/PagingStoreFactoryNIO.java | 8 +- .../core/paging/impl/PagingStoreImpl.java | 10 +- .../core/persistence/OperationContext.java | 4 +- .../core/persistence/StorageManager.java | 6 +- .../impl/journal/DescribeJournal.java | 8 +- .../impl/journal/JournalStorageManager.java | 33 +- .../impl/journal/LargeServerMessageImpl.java | 2 +- .../impl/journal/LargeServerMessageInSync.java | 4 +- .../impl/journal/OperationContextImpl.java | 12 +- .../nullpm/NullStorageLargeServerMessage.java | 2 +- .../impl/nullpm/NullStorageManager.java | 8 +- .../core/postoffice/impl/PostOfficeImpl.java | 6 +- .../core/ServerSessionPacketHandler.java | 4 +- .../core/replication/ReplicatedJournal.java | 2 +- .../core/replication/ReplicationEndpoint.java | 6 +- .../core/replication/ReplicationManager.java | 2 +- .../core/server/ActiveMQServerLogger.java | 10 +- .../artemis/core/server/LargeServerMessage.java | 2 +- .../core/server/cluster/impl/Redistributor.java | 4 +- .../server/impl/AIOFileLockNodeManager.java | 29 +- .../core/server/impl/ActiveMQServerImpl.java | 18 +- .../artemis/core/server/impl/QueueImpl.java | 4 +- .../core/server/impl/ServerSessionImpl.java | 4 +- .../core/transaction/impl/TransactionImpl.java | 8 +- .../artemis/tests/util/ActiveMQTestBase.java | 66 +- .../tests/util/ColocatedActiveMQServer.java | 7 +- .../integration/client/HangConsumerTest.java | 6 +- .../integration/client/JournalCrashTest.java | 4 +- .../client/LibaioDependencyCheckTest.java | 4 +- .../tests/integration/client/PagingTest.java | 8 +- .../client/RedeliveryConsumerTest.java | 4 +- .../integration/cluster/bridge/BridgeTest.java | 6 +- .../journal/AIOImportExportTest.java | 6 +- .../journal/AIOJournalCompactTest.java | 6 +- .../integration/journal/AIOJournalImplTest.java | 10 +- .../journal/AIOSequentialFileFactoryTest.java | 10 +- .../journal/NIOBufferedJournalCompactTest.java | 6 +- .../journal/NIOImportExportTest.java | 6 +- .../journal/NIOJournalCompactTest.java | 16 +- .../integration/journal/NIOJournalImplTest.java | 6 +- .../journal/NIONoBufferJournalImplTest.java | 6 +- ...NIONonBufferedSequentialFileFactoryTest.java | 6 +- .../journal/NIOSequentialFileFactoryTest.java | 6 +- .../journal/ValidateTransactionHealthTest.java | 15 +- .../management/ActiveMQServerControlTest.java | 12 +- .../replication/ReplicationTest.java | 16 +- .../integration/server/FileLockTimeoutTest.java | 4 +- .../journal/FakeJournalImplTest.java | 2 +- .../journal/JournalImplTestUnit.java | 13 +- .../journal/RealJournalImplAIOTest.java | 6 +- .../journal/RealJournalImplNIOTest.java | 6 +- .../AIOAllPossibilitiesCompactStressTest.java | 6 +- .../AIOMultiThreadCompactorStressTest.java | 2 +- .../stress/journal/AddAndRemoveStressTest.java | 16 +- .../stress/journal/CompactingStressTest.java | 12 +- .../JournalCleanupCompactStressTest.java | 50 +- .../stress/journal/MixupCompactorTestBase.java | 6 +- .../NIOMultiThreadCompactorStressTest.java | 8 +- .../core/journal/impl/AIOJournalImplTest.java | 6 +- .../core/journal/impl/FakeJournalImplTest.java | 2 +- .../core/journal/impl/JournalImplTestUnit.java | 15 +- .../core/journal/impl/NIOJournalImplTest.java | 6 +- .../tests/unit/core/asyncio/AIOTestBase.java | 18 +- .../unit/core/asyncio/AsynchronousFileTest.java | 1015 ------------------ .../MultiThreadAsynchronousFileTest.java | 81 +- .../journal/impl/AlignedJournalImplTest.java | 14 +- .../unit/core/journal/impl/CleanBufferTest.java | 16 +- .../core/journal/impl/FakeJournalImplTest.java | 2 +- .../impl/FakeSequentialFileFactoryTest.java | 2 +- .../core/journal/impl/FileFactoryTestBase.java | 18 +- .../core/journal/impl/JournalImplTestBase.java | 6 +- .../core/journal/impl/JournalImplTestUnit.java | 9 +- .../unit/core/journal/impl/ReclaimerTest.java | 2 +- .../impl/SequentialFileFactoryTestBase.java | 52 +- .../unit/core/journal/impl/TimedBufferTest.java | 26 +- .../impl/fakes/FakeSequentialFileFactory.java | 84 +- .../tests/unit/core/paging/impl/PageTest.java | 18 +- .../core/paging/impl/PagingStoreImplTest.java | 14 +- .../impl/BatchIDGeneratorUnitTest.java | 4 +- .../impl/OperationContextUnitTest.java | 8 +- .../unit/core/server/impl/FileLockTest.java | 4 +- 183 files changed, 6522 insertions(+), 7227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 71af624..ce57485 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,4 @@ ratReport.txt **/cmake_install.cmake # this file is generated -artemis-native/src/main/c/org_apache_activemq_artemis_core_libaio_Native.h +artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..4681205 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +CMAKE_MINIMUM_REQUIRED(VERSION 2.6) + +SUBDIRS(artemis-native) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 65155f3..195441f 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This file describes some minimum 'stuff one needs to know' to get started coding ## Source -For details about the modifying the code, building the project, running tests, IDE integration, etc. see +For details about the modifying the code, building the project, running tests, IDE integration, etc. see our [Hacking Guide](./docs/hacking-guide/en/SUMMARY.md). ## Examples http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index 7495ce2..2fdf5f2 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.attribute.PosixFilePermission; +import java.text.DecimalFormat; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -36,8 +37,10 @@ import java.util.regex.Pattern; import io.airlift.airline.Arguments; import io.airlift.airline.Command; import io.airlift.airline.Option; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; +import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jlibaio.LibaioFile; import static java.nio.file.attribute.PosixFilePermission.GROUP_EXECUTE; import static java.nio.file.attribute.PosixFilePermission.GROUP_READ; @@ -84,6 +87,7 @@ public class Create extends InputAbstract public static final String ETC_CLUSTER_SETTINGS_TXT = "etc/cluster-settings.txt"; public static final String ETC_CONNECTOR_SETTINGS_TXT = "etc/connector-settings.txt"; public static final String ETC_BOOTSTRAP_WEB_SETTINGS_TXT = "etc/bootstrap-web-settings.txt"; + public static final String ETC_JOURNAL_BUFFER_SETTINGS = "etc/journal-buffer-settings.txt"; @Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true) File directory; @@ -142,6 +146,9 @@ public class Create extends InputAbstract @Option(name = "--require-login", description = "This will configure security to require user / password, opposite of --allow-anonymous") Boolean requireLogin = null; + @Option(name = "--no-sync-test", description = "Disable the calculation for the buffer") + boolean noSyncTest; + @Option(name = "--user", description = "The username (Default: input)") String user; @@ -529,12 +536,16 @@ public class Create extends InputAbstract filters.put("${shared-store.settings}", ""); } - if (IS_WINDOWS || !AsynchronousFileImpl.isLoaded()) + boolean aio; + + if (IS_WINDOWS || !supportsLibaio()) { + aio = false; filters.put("${journal.settings}", "NIO"); } else { + aio = true; filters.put("${journal.settings}", "ASYNCIO"); } @@ -590,7 +601,8 @@ public class Create extends InputAbstract new File(directory, "etc").mkdirs(); new File(directory, "log").mkdirs(); new File(directory, "tmp").mkdirs(); - new File(directory, "data").mkdirs(); + File dataFolder = new File(directory, "data"); + dataFolder.mkdirs(); if (javaOptions == null || javaOptions.length() == 0) { @@ -638,7 +650,7 @@ public class Create extends InputAbstract filters.put("${bootstrap-web-settings}", applyFilters(readTextFile(ETC_BOOTSTRAP_WEB_SETTINGS_TXT), filters)); } - + performSyncCalc(filters, aio, dataFolder); write(ETC_BOOTSTRAP_XML, filters, false); write(ETC_BROKER_XML, filters, false); @@ -694,6 +706,76 @@ public class Create extends InputAbstract return null; } + private void performSyncCalc(HashMap<String, String> filters, boolean aio, File dataFolder) + { + if (noSyncTest) + { + filters.put("${journal-buffer.settings}", ""); + } + else + { + try + { + int writes = 250; + System.out.println(""); + System.out.println("Performing write sync calculation..."); + + long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, aio); + long nanoseconds = SyncCalculation.toNanos(time, writes); + double writesPerMillisecond = (double)writes / (double) time; + + String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond); + + HashMap<String, String> syncFilter = new HashMap<String, String>(); + syncFilter.put("${nanoseconds}", Long.toString(nanoseconds)); + syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr); + + System.out.println("done! Your system can make " + writesPerMillisecondStr + + " writes per millisecond, your journal-buffer-timeout will be " + nanoseconds); + + filters.put("${journal-buffer.settings}", applyFilters(readTextFile(ETC_JOURNAL_BUFFER_SETTINGS), syncFilter)); + + } + catch (Exception e) + { + filters.put("${journal-buffer.settings}", ""); + e.printStackTrace(); + System.err.println("Couldn't perform sync calculation, using default values"); + } + } + } + + private boolean supportsLibaio() + { + if (LibaioContext.isLoaded()) + { + try (LibaioContext context = new LibaioContext(1, true)) + { + File tmpFile = new File(directory, "validateAIO.bin"); + boolean supportsLibaio = true; + try + { + LibaioFile file = context.openFile(tmpFile, true); + file.close(); + } + catch (Exception e) + { + supportsLibaio = false; + } + tmpFile.delete(); + if (!supportsLibaio) + { + System.err.println("The filesystem used on " + directory + " doesn't support libAIO and O_DIRECT files, switching journal-type to NIO"); + } + return supportsLibaio; + } + } + else + { + return false; + } + } + private void makeExec(String path) throws IOException { try http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java index 0239998..38fb785 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java @@ -22,9 +22,9 @@ import io.airlift.airline.Command; import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; @Command(name = "compact", description = "Compacts the journal of a non running server") public final class CompactJournal extends DataAbstract implements Action @@ -54,7 +54,7 @@ public final class CompactJournal extends DataAbstract implements Action final int fileSize, final IOCriticalErrorListener listener) throws Exception { - NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener); + NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1); JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java index 5b28a18..6b608c3 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java @@ -35,7 +35,7 @@ import org.apache.activemq.artemis.cli.commands.Configurable; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalRecord; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.utils.Base64; @Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files") @@ -117,7 +117,7 @@ public class DecodeJournal extends Configurable implements Action System.err.println("Could not create directory " + directory); } - NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null); + NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1); JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java index 8b0721b..a408951 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java @@ -28,11 +28,11 @@ import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.Configurable; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.utils.Base64; @Command(name = "encode", description = "Encode a set of journal files into an internal encoded data format") @@ -113,7 +113,7 @@ public class EncodeJournal extends Configurable implements Action final int fileSize, final PrintStream out) throws Exception { - NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null); + NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1); JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java index e099a0b..cf5be12 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java @@ -52,10 +52,10 @@ import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.message.BodyEncoder; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -345,7 +345,7 @@ public final class XmlDataExporter extends DataAbstract implements Action private void getJmsBindings() throws Exception { - SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation()); + SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); Journal jmsJournal = new JournalImpl(1024 * 1024, 2, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java new file mode 100644 index 0000000..b5a8845 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands.util; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.utils.ReusableLatch; + +/** + * It will perform a simple test to evaluate how many syncs a disk can make per second + * * * + */ +public class SyncCalculation +{ + /** + * It will perform a write test of blockSize * bocks, sinc on each write, for N tries. + * It will return the lowest spent time from the tries. + */ + public static long syncTest(File datafolder, int blockSize, int blocks, int tries, boolean verbose, boolean aio) throws Exception + { + SequentialFileFactory factory = newFactory(datafolder, aio); + SequentialFile file = factory.createSequentialFile("test.tmp"); + + try + { + file.delete(); + file.open(); + + file.fill(blockSize * blocks); + + long[] result = new long[tries]; + + byte[] block = new byte[blockSize]; + + for (int i = 0; i < block.length; i++) + { + block[i] = (byte) 't'; + } + + ByteBuffer bufferBlock = factory.newBuffer(blockSize); + bufferBlock.put(block); + bufferBlock.position(0); + + final ReusableLatch latch = new ReusableLatch(0); + + IOCallback callback = new IOCallback() + { + @Override + public void done() + { + latch.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) + { + + } + }; + + DecimalFormat dcformat = new DecimalFormat("###.##"); + for (int ntry = 0; ntry < tries; ntry++) + { + + if (verbose) + { + System.out.println("**************************************************"); + System.out.println(ntry + " of " + tries + " calculation"); + } + file.position(0); + long start = System.currentTimeMillis(); + for (int i = 0; i < blocks; i++) + { + bufferBlock.position(0); + latch.countUp(); + file.writeDirect(bufferBlock, true, callback); + if (!latch.await(5, TimeUnit.SECONDS)) + { + throw new IOException("Callback wasn't called"); + } + } + long end = System.currentTimeMillis(); + + result[ntry] = (end - start); + + if (verbose) + { + double writesPerMillisecond = (double)blocks / (double) result[ntry]; + System.out.println("Time = " + result[ntry]); + System.out.println("Writes / millisecond = " + dcformat.format(writesPerMillisecond)); + System.out.println("bufferTimeout = " + toNanos(result[ntry], blocks)); + System.out.println("**************************************************"); + } + } + + factory.releaseDirectBuffer(bufferBlock); + + long totalTime = Long.MAX_VALUE; + for (int i = 0; i < tries; i++) + { + if (result[i] < totalTime) + { + totalTime = result[i]; + } + } + + return totalTime; + } + finally + { + try + { + file.close(); + } + catch (Exception e) + { + } + try + { + file.delete(); + } + catch (Exception e) + { + } + try + { + factory.stop(); + } + catch (Exception e) + { + } + } + } + + + public static long toNanos(long time, long blocks) + { + + double blocksPerMillisecond = (double) blocks / (double) (time); + + long nanoSeconds = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS); + + long timeWait = (long) (nanoSeconds / blocksPerMillisecond); + + return timeWait; + } + + private static SequentialFileFactory newFactory(File datafolder, boolean aio) + { + if (aio && LibaioContext.isLoaded()) + { + SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1); + factory.start(); + ((AIOSequentialFileFactory) factory).disableBufferReuse(); + + return factory; + } + else + { + SequentialFileFactory factory = new NIOSequentialFileFactory(datafolder, 1); + factory.start(); + return factory; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index 52d665e..1cd3245 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -42,6 +42,7 @@ under the License. <large-messages-directory>${data.dir}/large-messages</large-messages-directory> <journal-min-files>10</journal-min-files> +${journal-buffer.settings} ${connector-config.settings} <acceptors> <!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports Core, OpenWire, Stomp and AMQP. --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt new file mode 100644 index 0000000..566c29e --- /dev/null +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt @@ -0,0 +1,8 @@ + + <!-- + This value was determined through a calculation. + Your system could perform ${writesPerMillisecond} writes per millisecond + on the current journal configuration. + That translates as a sync write every ${nanoseconds} nanoseconds + --> + <journal-buffer-timeout>${nanoseconds}</journal-buffer-timeout> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index 99c8f23..3aed71e 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.cli.Artemis; import org.apache.activemq.artemis.cli.commands.Run; +import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; +import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.junit.After; @@ -69,15 +71,29 @@ public class ArtemisTest } @Test + public void testSync() throws Exception + { + int writes = 2560; + int tries = 10; + long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true); + System.out.println(); + System.out.println("TotalAvg = " + totalAvg); + long nanoTime = SyncCalculation.toNanos(totalAvg, writes); + System.out.println("nanoTime avg = " + nanoTime); + Assert.assertEquals(0, LibaioContext.getTotalMaxIO()); + + } + + @Test public void testSimpleRun() throws Exception { Run.setEmbedded(true); Artemis.main("create", temporaryFolder.getRoot().getAbsolutePath(), "--force", "--silent-input", "--no-web"); System.setProperty("artemis.instance", temporaryFolder.getRoot().getAbsolutePath()); // Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol - Artemis.main("run"); - Assert.assertEquals(Integer.valueOf(70), Artemis.execute("producer", "--txt-size", "50", "--message-count", "70", "--verbose")); - Assert.assertEquals(Integer.valueOf(70), Artemis.execute("consumer", "--txt-size", "50", "--verbose", "--break-on-null", "--receive-timeout", "100")); + Artemis.execute("run"); + Assert.assertEquals(Integer.valueOf(1000), Artemis.execute("producer", "--message-count", "1000", "--verbose")); + Assert.assertEquals(Integer.valueOf(1000), Artemis.execute("consumer", "--verbose", "--break-on-null", "--receive-timeout", "100")); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection(); @@ -116,6 +132,7 @@ public class ArtemisTest Artemis.execute("stop"); Assert.assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(0, LibaioContext.getTotalMaxIO()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java index 632e2c0..0ee3afb 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java @@ -49,6 +49,7 @@ public class StreamClassPathTest openStream(Create.ETC_CLUSTER_SETTINGS_TXT); openStream(Create.ETC_CONNECTOR_SETTINGS_TXT); openStream(Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT); + openStream(Create.ETC_JOURNAL_BUFFER_SETTINGS); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java index 8a47dfa..e69a7fd 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java @@ -34,4 +34,9 @@ public final class ActiveMQNativeIOError extends ActiveMQException { super(ActiveMQExceptionType.NATIVE_ERROR_CANT_INITIALIZE_AIO, msg); } + + public ActiveMQNativeIOError(String msg, Throwable e) + { + super(ActiveMQExceptionType.NATIVE_ERROR_CANT_INITIALIZE_AIO, msg, e); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java index b3ef038..2d884e7 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java @@ -29,9 +29,9 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.replication.ReplicatedJournal; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.JournalType; @@ -87,7 +87,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager createDir = config.isCreateBindingsDir(); - SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation()); + SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); Journal localJMS = new JournalImpl(1024 * 1024, 2, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AIOCallback.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AIOCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AIOCallback.java deleted file mode 100644 index 80aa753..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AIOCallback.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.asyncio; - -/** - * The interface used for AIO Callbacks. - */ -public interface AIOCallback -{ - /** - * Method for sync notifications. When this callback method is called, there is a guarantee the data is written on the disk. - * <br><b>Note:</b><i>Leave this method as soon as possible, or you would be blocking the whole notification thread</i> */ - void done(); - - /** - * Method for error notifications. - * Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/ - void onError(int errorCode, String errorMessage); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AsynchronousFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AsynchronousFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AsynchronousFile.java deleted file mode 100644 index 52b8e05..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AsynchronousFile.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.asyncio; - -import java.nio.ByteBuffer; - -import org.apache.activemq.artemis.api.core.ActiveMQException; - -public interface AsynchronousFile -{ - void close() throws InterruptedException, ActiveMQException; - - /** - * - * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error - * @param fileName - * @param maxIO The number of max concurrent asynchronous IO operations. It has to be balanced between the size of your writes and the capacity of your disk. - * @throws ActiveMQException - */ - void open(String fileName, int maxIO) throws ActiveMQException; - - /** - * Warning: This function will perform a synchronous IO, probably translating to a fstat call - * @throws ActiveMQException - * */ - long size() throws ActiveMQException; - - /** Any error will be reported on the callback interface */ - void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback); - - /** - * Performs an internal direct write. - * @throws ActiveMQException - */ - void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws ActiveMQException; - - void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws ActiveMQException; - - void fill(long position, int blocks, long size, byte fillChar) throws ActiveMQException; - - void setBufferCallback(BufferCallback callback); - - int getBlockSize(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/BufferCallback.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/BufferCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/BufferCallback.java deleted file mode 100644 index e7b0ca5..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/BufferCallback.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.asyncio; - -import java.nio.ByteBuffer; - -/** - * Used to receive a notification on completed buffers used by the AIO layer. - */ -public interface BufferCallback -{ - void bufferDone(ByteBuffer buffer); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/IOExceptionListener.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/IOExceptionListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/IOExceptionListener.java deleted file mode 100644 index 0cfe945..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/IOExceptionListener.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.asyncio; - -public interface IOExceptionListener -{ - void onIOException(Exception exception, String message); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/ActiveMQFileLock.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/ActiveMQFileLock.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/ActiveMQFileLock.java deleted file mode 100644 index 0af3152..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/ActiveMQFileLock.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.asyncio.impl; - -import java.io.IOException; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; - -import org.apache.activemq.artemis.core.libaio.Native; - -public class ActiveMQFileLock extends FileLock -{ - - private final int handle; - - protected ActiveMQFileLock(final int handle) - { - super((FileChannel)null, 0, 0, false); - this.handle = handle; - } - - @Override - public boolean isValid() - { - return true; - } - - @Override - public void release() throws IOException - { - Native.closeFile(handle); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/AsynchronousFileImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/AsynchronousFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/AsynchronousFileImpl.java deleted file mode 100644 index be4d885..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/AsynchronousFileImpl.java +++ /dev/null @@ -1,822 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.asyncio.impl; - -import java.nio.ByteBuffer; -import java.nio.channels.FileLock; -import java.util.PriorityQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; -import org.apache.activemq.artemis.core.asyncio.AIOCallback; -import org.apache.activemq.artemis.core.asyncio.AsynchronousFile; -import org.apache.activemq.artemis.core.asyncio.BufferCallback; -import org.apache.activemq.artemis.core.asyncio.IOExceptionListener; -import org.apache.activemq.artemis.core.libaio.Native; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; -import org.apache.activemq.artemis.utils.ReusableLatch; - -/** - * AsynchronousFile implementation - * - * Warning: Case you refactor the name or the package of this class - * You need to make sure you also rename the C++ native calls - */ -public class AsynchronousFileImpl implements AsynchronousFile -{ - // Static ---------------------------------------------------------------------------- - - private static final AtomicInteger totalMaxIO = new AtomicInteger(0); - - private static boolean loaded = false; - - /** - * This definition needs to match Version.h on the native sources. - * <br> - * Or else the native module won't be loaded because of version mismatches - */ - private static final int EXPECTED_NATIVE_VERSION = 52; - - /** - * Used to determine the next writing sequence - */ - private final AtomicLong nextWritingSequence = new AtomicLong(0); - - /** - * Used to determine the next writing sequence. - * This is accessed from a single thread (the Poller Thread) - */ - private long nextReadSequence = 0; - - /** - * AIO can't guarantee ordering over callbacks. - * <br> - * We use this {@link PriorityQueue} to hold values until they are in order - */ - private final PriorityQueue<CallbackHolder> pendingCallbacks = new PriorityQueue<CallbackHolder>(); - - public static void addMax(final int io) - { - AsynchronousFileImpl.totalMaxIO.addAndGet(io); - } - - /** - * For test purposes - */ - public static int getTotalMaxIO() - { - return AsynchronousFileImpl.totalMaxIO.get(); - } - - public static void resetMaxAIO() - { - AsynchronousFileImpl.totalMaxIO.set(0); - } - - public static int openFile(String fileName) - { - return Native.openFile(fileName); - } - - public static void closeFile(int handle) - { - Native.closeFile(handle); - } - - public static void destroyBuffer(ByteBuffer buffer) - { - Native.destroyBuffer(buffer); - } - - private static boolean loadLibrary(final String name) - { - try - { - ActiveMQJournalLogger.LOGGER.trace(name + " being loaded"); - System.loadLibrary(name); - if (Native.getNativeVersion() != AsynchronousFileImpl.EXPECTED_NATIVE_VERSION) - { - ActiveMQJournalLogger.LOGGER.incompatibleNativeLibrary(); - return false; - } - else - { - return true; - } - } - catch (Throwable e) - { - ActiveMQJournalLogger.LOGGER.debug(name + " -> error loading the native library", e); - return false; - } - - } - - static - { - String[] libraries = new String[]{"artemis-native", "artemis-native-64", "artemis-native-32"}; - - for (String library : libraries) - { - if (AsynchronousFileImpl.loadLibrary(library)) - { - AsynchronousFileImpl.loaded = true; - break; - } - else - { - ActiveMQJournalLogger.LOGGER.debug("Library " + library + " not found!"); - } - } - - if (!AsynchronousFileImpl.loaded) - { - ActiveMQJournalLogger.LOGGER.debug("Couldn't locate LibAIO Wrapper"); - } - } - - public static boolean isLoaded() - { - return AsynchronousFileImpl.loaded; - } - - // Attributes ------------------------------------------------------------------------ - - private boolean opened = false; - - private String fileName; - - /** - * Used while inside the callbackDone and callbackError - */ - private final Lock callbackLock = new ReentrantLock(); - - private final ReusableLatch pollerLatch = new ReusableLatch(); - - private volatile Runnable poller; - - private int maxIO; - - private final Lock writeLock = new ReentrantReadWriteLock().writeLock(); - - private final ReusableLatch pendingWrites = new ReusableLatch(); - - private Semaphore maxIOSemaphore; - - private BufferCallback bufferCallback; - - /** - * A callback for IO errors when they happen - */ - private final IOExceptionListener ioExceptionListener; - - /** - * Warning: Beware of the C++ pointer! It will bite you! :-) - */ - private ByteBuffer handler; - - // A context switch on AIO would make it to synchronize the disk before - // switching to the new thread, what would cause - // serious performance problems. Because of that we make all the writes on - // AIO using a single thread. - private final Executor writeExecutor; - - private final Executor pollerExecutor; - - // AsynchronousFile implementation --------------------------------------------------- - - /** - * @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations - * @param pollerExecutor The thread pool that will initialize poller handlers - */ - public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor, final IOExceptionListener ioExceptionListener) - { - this.writeExecutor = writeExecutor; - this.pollerExecutor = pollerExecutor; - this.ioExceptionListener = ioExceptionListener; - } - - public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor) - { - this(writeExecutor, pollerExecutor, null); - } - - public void open(final String fileName1, final int maxIOArgument) throws ActiveMQException - { - writeLock.lock(); - - try - { - if (opened) - { - throw new IllegalStateException("AsynchronousFile is already opened"); - } - - this.maxIO = maxIOArgument; - maxIOSemaphore = new Semaphore(this.maxIO); - - this.fileName = fileName1; - - try - { - handler = Native.init(AsynchronousFileImpl.class, fileName1, this.maxIO, ActiveMQJournalLogger.LOGGER); - } - catch (ActiveMQException e) - { - ActiveMQException ex = null; - if (e.getType() == ActiveMQExceptionType.NATIVE_ERROR_CANT_INITIALIZE_AIO) - { - ex = new ActiveMQException(e.getType(), - "Can't initialize AIO. Currently AIO in use = " + AsynchronousFileImpl.totalMaxIO.get() + - ", trying to allocate more " + - maxIOArgument, - e); - } - else - { - ex = e; - } - throw ex; - } - opened = true; - AsynchronousFileImpl.addMax(this.maxIO); - nextWritingSequence.set(0); - nextReadSequence = 0; - } - finally - { - writeLock.unlock(); - } - } - - public void close() throws InterruptedException, ActiveMQException - { - checkOpened(); - - writeLock.lock(); - - try - { - - while (!pendingWrites.await(60000)) - { - ActiveMQJournalLogger.LOGGER.couldNotGetLock(fileName); - } - - while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) - { - ActiveMQJournalLogger.LOGGER.couldNotGetLock(fileName); - } - - maxIOSemaphore = null; - if (poller != null) - { - stopPoller(); - } - - if (handler != null) - { - Native.closeInternal(handler); - AsynchronousFileImpl.addMax(-maxIO); - } - opened = false; - handler = null; - } - finally - { - writeLock.unlock(); - } - } - - - public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws ActiveMQException - { - try - { - Native.writeInternal(handler, positionToWrite, size, bytes); - } - catch (ActiveMQException e) - { - fireExceptionListener(e.getType().getCode(), e.getMessage()); - throw e; - } - if (bufferCallback != null) - { - bufferCallback.bufferDone(bytes); - } - } - - - public void write(final long position, - final long size, - final ByteBuffer directByteBuffer, - final AIOCallback aioCallback) - { - if (aioCallback == null) - { - throw new NullPointerException("Null Callback"); - } - - checkOpened(); - if (poller == null) - { - startPoller(); - } - - pendingWrites.countUp(); - - if (writeExecutor != null) - { - maxIOSemaphore.acquireUninterruptibly(); - - writeExecutor.execute(new Runnable() - { - public void run() - { - long sequence = nextWritingSequence.getAndIncrement(); - - try - { - Native.write(AsynchronousFileImpl.this, handler, sequence, position, size, directByteBuffer, aioCallback); - } - catch (ActiveMQException e) - { - callbackError(aioCallback, sequence, directByteBuffer, e.getType().getCode(), e.getMessage()); - } - catch (RuntimeException e) - { - callbackError(aioCallback, - sequence, - directByteBuffer, - ActiveMQExceptionType.INTERNAL_ERROR.getCode(), - e.getMessage()); - } - } - }); - } - else - { - maxIOSemaphore.acquireUninterruptibly(); - - long sequence = nextWritingSequence.getAndIncrement(); - - try - { - Native.write(this, handler, sequence, position, size, directByteBuffer, aioCallback); - } - catch (ActiveMQException e) - { - callbackError(aioCallback, sequence, directByteBuffer, e.getType().getCode(), e.getMessage()); - } - catch (RuntimeException e) - { - callbackError(aioCallback, sequence, directByteBuffer, ActiveMQExceptionType.INTERNAL_ERROR.getCode(), e.getMessage()); - } - } - - } - - public void read(final long position, - final long size, - final ByteBuffer directByteBuffer, - final AIOCallback aioPackage) throws ActiveMQException - { - checkOpened(); - if (poller == null) - { - startPoller(); - } - pendingWrites.countUp(); - maxIOSemaphore.acquireUninterruptibly(); - try - { - Native.read(this, handler, position, size, directByteBuffer, aioPackage); - } - catch (ActiveMQException e) - { - // Release only if an exception happened - maxIOSemaphore.release(); - pendingWrites.countDown(); - throw e; - } - catch (RuntimeException e) - { - // Release only if an exception happened - maxIOSemaphore.release(); - pendingWrites.countDown(); - throw e; - } - } - - public long size() throws ActiveMQException - { - checkOpened(); - return Native.size0(handler); - } - - public void fill(final long position, final int blocks, final long size, final byte fillChar) throws ActiveMQException - { - checkOpened(); - try - { - Native.fill(handler, position, blocks, size, fillChar); - } - catch (ActiveMQException e) - { - fireExceptionListener(e.getType().getCode(), e.getMessage()); - throw e; - } - } - - public int getBlockSize() - { - return 512; - } - - /** - * This needs to be synchronized because of - * http://bugs.sun.com/view_bug.do?bug_id=6791815 - * http://mail.openjdk.java.net/pipermail/hotspot-runtime-dev/2009-January/000386.html - */ - public static synchronized ByteBuffer newBuffer(final int size) - { - if (size % 512 != 0) - { - throw new RuntimeException("Buffer size needs to be aligned to 512"); - } - - return Native.newNativeBuffer(size); - } - - public void setBufferCallback(final BufferCallback callback) - { - bufferCallback = callback; - } - - /** - * Return the JNI handler used on C++ - */ - public ByteBuffer getHandler() - { - return handler; - } - - public static void clearBuffer(final ByteBuffer buffer) - { - Native.resetBuffer(buffer, buffer.limit()); - buffer.position(0); - } - - // Protected ------------------------------------------------------------------------- - - @Override - protected void finalize() - { - if (opened) - { - ActiveMQJournalLogger.LOGGER.fileFinalizedWhileOpen(fileName); - } - } - - // Private --------------------------------------------------------------------------- - - private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer) - { - maxIOSemaphore.release(); - - pendingWrites.countDown(); - - callbackLock.lock(); - - try - { - - if (sequence == -1) - { - callback.done(); - } - else - { - if (sequence == nextReadSequence) - { - nextReadSequence++; - callback.done(); - flushCallbacks(); - } - else - { - pendingCallbacks.add(new CallbackHolder(sequence, callback)); - } - } - - // The buffer is not sent on callback for read operations - if (bufferCallback != null && buffer != null) - { - bufferCallback.bufferDone(buffer); - } - } - finally - { - callbackLock.unlock(); - } - } - - private void flushCallbacks() - { - while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence == nextReadSequence) - { - CallbackHolder holder = pendingCallbacks.poll(); - if (holder.isError()) - { - ErrorCallback error = (ErrorCallback) holder; - holder.callback.onError(error.errorCode, error.message); - } - else - { - holder.callback.done(); - } - nextReadSequence++; - } - } - - // Called by the JNI layer.. just ignore the - // warning - private void callbackError(final AIOCallback callback, - final long sequence, - final ByteBuffer buffer, - final int errorCode, - final String errorMessage) - { - ActiveMQJournalLogger.LOGGER.callbackError(errorMessage); - - fireExceptionListener(errorCode, errorMessage); - - maxIOSemaphore.release(); - - pendingWrites.countDown(); - - callbackLock.lock(); - - try - { - if (sequence == -1) - { - callback.onError(errorCode, errorMessage); - } - else - { - if (sequence == nextReadSequence) - { - nextReadSequence++; - callback.onError(errorCode, errorMessage); - flushCallbacks(); - } - else - { - pendingCallbacks.add(new ErrorCallback(sequence, callback, errorCode, errorMessage)); - } - } - } - finally - { - callbackLock.unlock(); - } - - // The buffer is not sent on callback for read operations - if (bufferCallback != null && buffer != null) - { - bufferCallback.bufferDone(buffer); - } - } - - /** - * This is called by the native layer - * - * @param errorCode - * @param errorMessage - */ - private void fireExceptionListener(final int errorCode, final String errorMessage) - { - ActiveMQJournalLogger.LOGGER.ioError(errorCode, errorMessage); - if (ioExceptionListener != null) - { - ioExceptionListener.onIOException(ActiveMQExceptionType.getType(errorCode).createException(errorMessage), errorMessage); - } - } - - private void pollEvents() - { - if (!opened) - { - return; - } - Native.internalPollEvents(handler); - } - - private void startPoller() - { - writeLock.lock(); - - try - { - - if (poller == null) - { - pollerLatch.countUp(); - poller = new PollerRunnable(); - try - { - pollerExecutor.execute(poller); - } - catch (Exception ex) - { - ActiveMQJournalLogger.LOGGER.errorStartingPoller(ex); - } - } - } - finally - { - writeLock.unlock(); - } - } - - private void checkOpened() - { - if (!opened) - { - throw new RuntimeException("File is not opened"); - } - } - - /** - * @throws ActiveMQException - * @throws InterruptedException - */ - private void stopPoller() throws ActiveMQException, InterruptedException - { - Native.stopPoller(handler); - // We need to make sure we won't call close until Poller is - // completely done, or we might get beautiful GPFs - pollerLatch.await(); - } - - public static FileLock lock(int handle) - { - if (Native.flock(handle)) - { - return new ActiveMQFileLock(handle); - } - else - { - return null; - } - } - - // Native ---------------------------------------------------------------------------- - - - /** - * Explicitly adding a compare to clause that returns 0 for at least the same object. - * <br> - * If {@link Comparable#compareTo(Object)} does not return 0 -for at least the same object- some - * Collection classes methods will fail (example {@link PriorityQueue#remove(Object)}. If it - * returns 0, then {@link #equals(Object)} must return {@code true} for the exact same cases, - * otherwise we will get compatibility problems between Java5 and Java6. - */ - private static class CallbackHolder implements Comparable<CallbackHolder> - { - final long sequence; - - final AIOCallback callback; - - public boolean isError() - { - return false; - } - - public CallbackHolder(final long sequence, final AIOCallback callback) - { - this.sequence = sequence; - this.callback = callback; - } - - public int compareTo(final CallbackHolder o) - { - // It shouldn't be equals in any case - if (this == o) - return 0; - if (sequence <= o.sequence) - { - return -1; - } - else - { - return 1; - } - } - - /** - * See {@link CallbackHolder}. - */ - @Override - public int hashCode() - { - return super.hashCode(); - } - - /** - * See {@link CallbackHolder}. - */ - @Override - public boolean equals(Object obj) - { - return super.equals(obj); - } - } - - private static final class ErrorCallback extends CallbackHolder - { - final int errorCode; - - final String message; - - @Override - public boolean isError() - { - return true; - } - - public ErrorCallback(final long sequence, final AIOCallback callback, final int errorCode, final String message) - { - super(sequence, callback); - - this.errorCode = errorCode; - - this.message = message; - } - - /** - * See {@link CallbackHolder}. - */ - @Override - public int hashCode() - { - return super.hashCode(); - } - - /** - * See {@link CallbackHolder}. - */ - @Override - public boolean equals(Object obj) - { - return super.equals(obj); - } - } - - private class PollerRunnable implements Runnable - { - PollerRunnable() - { - } - - public void run() - { - try - { - pollEvents(); - } - finally - { - // This gives us extra protection in cases of interruption - // Case the poller thread is interrupted, this will allow us to - // restart the thread when required - poller = null; - pollerLatch.countDown(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java new file mode 100644 index 0000000..acc0732 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver; +import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; + +public abstract class AbstractSequentialFile implements SequentialFile +{ + + private File file; + + protected final File directory; + + protected final SequentialFileFactory factory; + + protected long fileSize = 0; + + protected final AtomicLong position = new AtomicLong(0); + + protected TimedBuffer timedBuffer; + + /** + * Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class. + * This is the class returned to the factory when the file is being activated. + */ + protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver(); + + /** + * Used for asynchronous writes + */ + protected final Executor writerExecutor; + + /** + * @param file + * @param directory + */ + public AbstractSequentialFile(final File directory, + final String file, + final SequentialFileFactory factory, + final Executor writerExecutor) + { + super(); + this.file = new File(directory, file); + this.directory = directory; + this.factory = factory; + this.writerExecutor = writerExecutor; + } + + // Public -------------------------------------------------------- + + public final boolean exists() + { + return file.exists(); + } + + public final String getFileName() + { + return file.getName(); + } + + public final void delete() throws IOException, InterruptedException, ActiveMQException + { + if (isOpen()) + { + close(); + } + + if (file.exists() && !file.delete()) + { + ActiveMQJournalLogger.LOGGER.errorDeletingFile(this); + } + } + + public void copyTo(SequentialFile newFileName) throws Exception + { + try + { + ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + newFileName); + if (!newFileName.isOpen()) + { + newFileName.open(); + } + + if (!isOpen()) + { + this.open(); + } + + + ByteBuffer buffer = ByteBuffer.allocate(10 * 1024); + + for (;;) + { + buffer.rewind(); + int size = this.read(buffer); + newFileName.writeDirect(buffer, false); + if (size < 10 * 1024) + { + break; + } + } + newFileName.close(); + this.close(); + } + catch (IOException e) + { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + } + + /** + * @throws IOException only declare exception due to signature. Sub-class needs it. + */ + @Override + public void position(final long pos) throws IOException + { + position.set(pos); + } + + public long position() + { + return position.get(); + } + + public final void renameTo(final String newFileName) throws IOException, InterruptedException, + ActiveMQException + { + try + { + close(); + } + catch (IOException e) + { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + + File newFile = new File(directory + "/" + newFileName); + + if (!file.equals(newFile)) + { + if (!file.renameTo(newFile)) + { + throw ActiveMQJournalBundle.BUNDLE.ioRenameFileError(file.getName(), newFileName); + } + file = newFile; + } + } + + /** + * @throws IOException we declare throwing IOException because sub-classes need to do it + * @throws ActiveMQException + */ + public synchronized void close() throws IOException, InterruptedException, ActiveMQException + { + final CountDownLatch donelatch = new CountDownLatch(1); + + if (writerExecutor != null) + { + writerExecutor.execute(new Runnable() + { + public void run() + { + donelatch.countDown(); + } + }); + + while (!donelatch.await(60, TimeUnit.SECONDS)) + { + ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName()); + } + } + } + + public final boolean fits(final int size) + { + if (timedBuffer == null) + { + return position.get() + size <= fileSize; + } + else + { + return timedBuffer.checkSize(size); + } + } + + public void setTimedBuffer(final TimedBuffer buffer) + { + if (timedBuffer != null) + { + timedBuffer.setObserver(null); + } + + timedBuffer = buffer; + + if (buffer != null) + { + buffer.setObserver(timedBufferObserver); + } + + } + + public void write(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) throws IOException + { + if (timedBuffer != null) + { + bytes.setIndex(0, bytes.capacity()); + timedBuffer.addBytes(bytes, sync, callback); + } + else + { + ByteBuffer buffer = factory.newBuffer(bytes.capacity()); + buffer.put(bytes.toByteBuffer().array()); + buffer.rewind(); + writeDirect(buffer, sync, callback); + } + } + + public void write(final ActiveMQBuffer bytes, final boolean sync) throws IOException, InterruptedException, + ActiveMQException + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + + write(bytes, true, completion); + + completion.waitCompletion(); + } + else + { + write(bytes, false, DummyCallback.getInstance()); + } + } + + public void write(final EncodingSupport bytes, final boolean sync, final IOCallback callback) + { + if (timedBuffer != null) + { + timedBuffer.addBytes(bytes, sync, callback); + } + else + { + ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize()); + + // If not using the TimedBuffer, a final copy is necessary + // Because AIO will need a specific Buffer + // And NIO will also need a whole buffer to perform the write + + ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer); + bytes.encode(outBuffer); + buffer.rewind(); + writeDirect(buffer, sync, callback); + } + } + + public void write(final EncodingSupport bytes, final boolean sync) throws InterruptedException, ActiveMQException + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + + write(bytes, true, completion); + + completion.waitCompletion(); + } + else + { + write(bytes, false, DummyCallback.getInstance()); + } + } + + protected File getFile() + { + return file; + } + + private static final class DelegateCallback implements IOCallback + { + final List<IOCallback> delegates; + + private DelegateCallback(final List<IOCallback> delegates) + { + this.delegates = delegates; + } + + public void done() + { + for (IOCallback callback : delegates) + { + try + { + callback.done(); + } + catch (Throwable e) + { + ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e); + } + } + } + + public void onError(final int errorCode, final String errorMessage) + { + for (IOCallback callback : delegates) + { + try + { + callback.onError(errorCode, errorMessage); + } + catch (Throwable e) + { + ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e); + } + } + } + } + + protected ByteBuffer newBuffer(int size, int limit) + { + size = factory.calculateBlockSize(size); + limit = factory.calculateBlockSize(limit); + + ByteBuffer buffer = factory.newBuffer(size); + buffer.limit(limit); + return buffer; + } + + protected class LocalBufferObserver implements TimedBufferObserver + { + public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) + { + buffer.flip(); + + if (buffer.limit() == 0) + { + factory.releaseBuffer(buffer); + } + else + { + writeDirect(buffer, requestedSync, new DelegateCallback(callbacks)); + } + } + + public ByteBuffer newBuffer(final int size, final int limit) + { + return AbstractSequentialFile.this.newBuffer(size, limit); + } + + public int getRemainingBytes() + { + if (fileSize - position.get() > Integer.MAX_VALUE) + { + return Integer.MAX_VALUE; + } + else + { + return (int)(fileSize - position.get()); + } + } + + @Override + public String toString() + { + return "TimedBufferObserver on file (" + getFile().getName() + ")"; + } + + } + + @Override + public File getJavaFile() + { + return getFile().getAbsoluteFile(); + } +}
