http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java index b85a845..d0140f1 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.artemis.core.journal; -public interface IOCompletion extends IOAsyncTask +import org.apache.activemq.artemis.core.io.IOCallback; + +public interface IOCompletion extends IOCallback { void storeLineUp(); } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java deleted file mode 100644 index fc0bbf9..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal; - -public interface IOCriticalErrorListener -{ - void onIOException(Exception code, String message, SequentialFile file); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index f3335b0..6b0beab 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal; import java.util.List; import java.util.Map; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.server.ActiveMQComponent; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java deleted file mode 100644 index 34e6d02..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.journal.impl.TimedBuffer; - -public interface SequentialFile -{ - /* - * Creates the file if it doesn't already exist, then opens it - */ - void open() throws Exception; - - boolean isOpen(); - - boolean exists(); - - /** - * The maximum number of simultaneous writes accepted - * @param maxIO - * @throws Exception - */ - void open(int maxIO, boolean useExecutor) throws Exception; - - boolean fits(int size); - - int getAlignment() throws Exception; - - int calculateBlockStart(int position) throws Exception; - - String getFileName(); - - void fill(int position, int size, byte fillCharacter) throws Exception; - - void delete() throws IOException, InterruptedException, ActiveMQException; - - void write(ActiveMQBuffer bytes, boolean sync, IOAsyncTask callback) throws Exception; - - void write(ActiveMQBuffer bytes, boolean sync) throws Exception; - - void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception; - - void write(EncodingSupport bytes, boolean sync) throws Exception; - - /** - * Write directly to the file without using any buffer - * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or - * NIO). To be safe, use a buffer from the corresponding - * {@link SequentialFileFactory#newBuffer(int)}. - */ - void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback); - - /** - * Write directly to the file without using any buffer - * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or - * NIO). To be safe, use a buffer from the corresponding - * {@link SequentialFileFactory#newBuffer(int)}. - */ - void writeDirect(ByteBuffer bytes, boolean sync) throws Exception; - - /** - * Write directly to the file. This is used by compacting and other places where we write a big - * buffer in a single shot. writeInternal should always block until the entire write is sync on - * disk. - * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or - * NIO). To be safe, use a buffer from the corresponding - * {@link SequentialFileFactory#newBuffer(int)}. - */ - void writeInternal(ByteBuffer bytes) throws Exception; - - /** - * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or - * NIO). To be safe, use a buffer from the corresponding - * {@link SequentialFileFactory#newBuffer(int)}. - */ - int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception; - - /** - * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or - * NIO). To be safe, use a buffer from the corresponding - * {@link SequentialFileFactory#newBuffer(int)}. - */ - int read(ByteBuffer bytes) throws Exception; - - void position(long pos) throws IOException; - - long position(); - - void close() throws Exception; - - void waitForClose() throws Exception; - - void sync() throws IOException; - - long size() throws Exception; - - void renameTo(String newFileName) throws Exception; - - SequentialFile cloneFile(); - - void copyTo(SequentialFile newFileName) throws Exception; - - void setTimedBuffer(TimedBuffer buffer); - - /** - * Returns a native File of the file underlying this sequential file. - */ - File getJavaFile(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java deleted file mode 100644 index cb47bd9..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal; - -import java.io.File; -import java.nio.ByteBuffer; -import java.util.List; - -/** - * - * A SequentialFileFactory - */ -public interface SequentialFileFactory -{ - SequentialFile createSequentialFile(String fileName, int maxIO); - - /** - * Lists files that end with the given extension. - * <p> - * This method inserts a ".' before the extension. - * @param extension - * @return - * @throws Exception - */ - List<String> listFiles(String extension) throws Exception; - - boolean isSupportsCallbacks(); - - /** The SequentialFile will call this method when a disk IO Error happens during the live phase. */ - void onIOError(Exception exception, String message, SequentialFile file); - - /** used for cases where you need direct buffer outside of the journal context. - * This is because the native layer has a method that can be reused in certain cases like paging */ - ByteBuffer allocateDirectBuffer(int size); - - /** used for cases where you need direct buffer outside of the journal context. - * This is because the native layer has a method that can be reused in certain cases like paging */ - void releaseDirectBuffer(ByteBuffer buffer); - - /** - * Note: You need to release the buffer if is used for reading operations. You don't need to do - * it if using writing operations (AIO Buffer Lister will take of writing operations) - * @param size - * @return the allocated ByteBuffer - */ - ByteBuffer newBuffer(int size); - - void releaseBuffer(ByteBuffer buffer); - - void activateBuffer(SequentialFile file); - - void deactivateBuffer(); - - // To be used in tests only - ByteBuffer wrapBuffer(byte[] bytes); - - int getAlignment(); - - int calculateBlockSize(int bytes); - - File getDirectory(); - - void clearBuffer(ByteBuffer buffer); - - void start(); - - void stop(); - - /** - * Creates the directory if it does not exist yet. - */ - void createDirs() throws Exception; - - void flush(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java deleted file mode 100644 index acef8a5..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.Executor; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.asyncio.AsynchronousFile; -import org.apache.activemq.artemis.core.asyncio.BufferCallback; -import org.apache.activemq.artemis.core.asyncio.IOExceptionListener; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; - -public class AIOSequentialFile extends AbstractSequentialFile implements IOExceptionListener -{ - private boolean opened = false; - - private final int maxIO; - - private AsynchronousFile aioFile; - - private final BufferCallback bufferCallback; - - /** The pool for Thread pollers */ - private final Executor pollerExecutor; - - public AIOSequentialFile(final SequentialFileFactory factory, - final int bufferSize, - final long bufferTimeoutMilliseconds, - final File directory, - final String fileName, - final int maxIO, - final BufferCallback bufferCallback, - final Executor writerExecutor, - final Executor pollerExecutor) - { - super(directory, fileName, factory, writerExecutor); - this.maxIO = maxIO; - this.bufferCallback = bufferCallback; - this.pollerExecutor = pollerExecutor; - } - - public boolean isOpen() - { - return opened; - } - - public int getAlignment() - { - checkOpened(); - - return aioFile.getBlockSize(); - } - - public int calculateBlockStart(final int position) - { - int alignment = getAlignment(); - - int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; - - return pos; - } - - public SequentialFile cloneFile() - { - return new AIOSequentialFile(factory, - -1, - -1, - getFile().getParentFile(), - getFile().getName(), - maxIO, - bufferCallback, - writerExecutor, - pollerExecutor); - } - - @Override - public synchronized void close() throws IOException, InterruptedException, ActiveMQException - { - if (!opened) - { - return; - } - - super.close(); - - opened = false; - - timedBuffer = null; - - aioFile.close(); - aioFile = null; - - notifyAll(); - } - - @Override - public synchronized void waitForClose() throws Exception - { - while (isOpen()) - { - wait(); - } - } - - public void fill(final int position, final int size, final byte fillCharacter) throws Exception - { - checkOpened(); - - int fileblockSize = aioFile.getBlockSize(); - - int blockSize = fileblockSize; - - if (size % (100 * 1024 * 1024) == 0) - { - blockSize = 100 * 1024 * 1024; - } - else if (size % (10 * 1024 * 1024) == 0) - { - blockSize = 10 * 1024 * 1024; - } - else if (size % (1024 * 1024) == 0) - { - blockSize = 1024 * 1024; - } - else if (size % (10 * 1024) == 0) - { - blockSize = 10 * 1024; - } - else - { - blockSize = fileblockSize; - } - - int blocks = size / blockSize; - - if (size % blockSize != 0) - { - blocks++; - } - - int filePosition = position; - - if (position % fileblockSize != 0) - { - filePosition = (position / fileblockSize + 1) * fileblockSize; - } - - aioFile.fill(filePosition, blocks, blockSize, fillCharacter); - - fileSize = aioFile.size(); - } - - public void open() throws Exception - { - open(maxIO, true); - } - - public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException - { - opened = true; - - aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null, pollerExecutor, this); - - try - { - aioFile.open(getFile().getAbsolutePath(), maxIO); - } - catch (ActiveMQException e) - { - factory.onIOError(e, e.getMessage(), this); - throw e; - } - - position.set(0); - - aioFile.setBufferCallback(bufferCallback); - - fileSize = aioFile.size(); - } - - public void setBufferCallback(final BufferCallback callback) - { - aioFile.setBufferCallback(callback); - } - - public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws ActiveMQException - { - int bytesToRead = bytes.limit(); - - long positionToRead = position.getAndAdd(bytesToRead); - - bytes.rewind(); - - aioFile.read(positionToRead, bytesToRead, bytes, callback); - - return bytesToRead; - } - - public int read(final ByteBuffer bytes) throws Exception - { - SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback(); - - int bytesRead = read(bytes, waitCompletion); - - waitCompletion.waitCompletion(); - - return bytesRead; - } - - public void sync() - { - throw new UnsupportedOperationException("This method is not supported on AIO"); - } - - public long size() throws Exception - { - if (aioFile == null) - { - return getFile().length(); - } - else - { - return aioFile.size(); - } - } - - @Override - public String toString() - { - return "AIOSequentialFile:" + getFile().getAbsolutePath(); - } - - // Public methods - // ----------------------------------------------------------------------------------------------------- - - @Override - public void onIOException(Exception code, String message) - { - factory.onIOError(code, message, this); - } - - - public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception - { - if (sync) - { - SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); - - writeDirect(bytes, true, completion); - - completion.waitCompletion(); - } - else - { - writeDirect(bytes, false, DummyCallback.getInstance()); - } - } - - /** - * - * @param sync Not used on AIO - * */ - public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) - { - final int bytesToWrite = factory.calculateBlockSize(bytes.limit()); - - final long positionToWrite = position.getAndAdd(bytesToWrite); - - aioFile.write(positionToWrite, bytesToWrite, bytes, callback); - } - - public void writeInternal(final ByteBuffer bytes) throws ActiveMQException - { - final int bytesToWrite = factory.calculateBlockSize(bytes.limit()); - - final long positionToWrite = position.getAndAdd(bytesToWrite); - - aioFile.writeInternal(positionToWrite, bytesToWrite, bytes); - } - - // Protected methods - // ----------------------------------------------------------------------------------------------------- - - @Override - protected ByteBuffer newBuffer(int size, int limit) - { - size = factory.calculateBlockSize(size); - limit = factory.calculateBlockSize(limit); - - ByteBuffer buffer = factory.newBuffer(size); - buffer.limit(limit); - return buffer; - } - - // Private methods - // ----------------------------------------------------------------------------------------------------- - - private void checkOpened() - { - if (aioFile == null || !opened) - { - throw new IllegalStateException("File not opened"); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java deleted file mode 100644 index 65e6a6f..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import java.io.File; -import java.nio.ByteBuffer; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; -import org.apache.activemq.artemis.core.asyncio.BufferCallback; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; -import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.libaio.Native; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; -import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; - -public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory -{ - private static final boolean trace = ActiveMQJournalLogger.LOGGER.isTraceEnabled(); - - private final ReuseBuffersController buffersControl = new ReuseBuffersController(); - - private ExecutorService pollerExecutor; - - // This method exists just to make debug easier. - // I could replace log.trace by log.info temporarily while I was debugging - // Journal - private static void trace(final String message) - { - ActiveMQJournalLogger.LOGGER.trace(message); - } - - public AIOSequentialFileFactory(final File journalDir) - { - this(journalDir, - JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, - JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, - false, - null); - } - - public AIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener) - { - this(journalDir, - JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, - JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, - false, - listener); - } - - public AIOSequentialFileFactory(final File journalDir, - final int bufferSize, - final int bufferTimeout, - final boolean logRates) - { - this(journalDir, bufferSize, bufferTimeout, logRates, null); - } - - public AIOSequentialFileFactory(final File journalDir, - final int bufferSize, - final int bufferTimeout, - final boolean logRates, - final IOCriticalErrorListener listener) - { - super(journalDir, true, bufferSize, bufferTimeout, logRates, listener); - } - - public SequentialFile createSequentialFile(final String fileName, final int maxIO) - { - return new AIOSequentialFile(this, - bufferSize, - bufferTimeout, - journalDir, - fileName, - maxIO, - buffersControl.callback, - writeExecutor, - pollerExecutor); - } - - public boolean isSupportsCallbacks() - { - return true; - } - - public static boolean isSupported() - { - return AsynchronousFileImpl.isLoaded(); - } - - public ByteBuffer allocateDirectBuffer(final int size) - { - - int blocks = size / 512; - if (size % 512 != 0) - { - blocks++; - } - - // The buffer on AIO has to be a multiple of 512 - ByteBuffer buffer = AsynchronousFileImpl.newBuffer(blocks * 512); - - buffer.limit(size); - - return buffer; - } - - public void releaseDirectBuffer(final ByteBuffer buffer) - { - Native.destroyBuffer(buffer); - } - - public ByteBuffer newBuffer(int size) - { - if (size % 512 != 0) - { - size = (size / 512 + 1) * 512; - } - - return buffersControl.newBuffer(size); - } - - public void clearBuffer(final ByteBuffer directByteBuffer) - { - AsynchronousFileImpl.clearBuffer(directByteBuffer); - } - - public int getAlignment() - { - return 512; - } - - // For tests only - public ByteBuffer wrapBuffer(final byte[] bytes) - { - ByteBuffer newbuffer = newBuffer(bytes.length); - newbuffer.put(bytes); - return newbuffer; - } - - public int calculateBlockSize(final int position) - { - int alignment = getAlignment(); - - int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; - - return pos; - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer) - */ - @Override - public synchronized void releaseBuffer(final ByteBuffer buffer) - { - Native.destroyBuffer(buffer); - } - - @Override - public void start() - { - super.start(); - - pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this), - true, - AIOSequentialFileFactory.getThisClassLoader())); - - } - - @Override - public void stop() - { - buffersControl.stop(); - - if (pollerExecutor != null) - { - pollerExecutor.shutdown(); - - try - { - if (!pollerExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) - { - ActiveMQJournalLogger.LOGGER.timeoutOnPollerShutdown(new Exception("trace")); - } - } - catch (InterruptedException e) - { - throw new ActiveMQInterruptedException(e); - } - } - - super.stop(); - } - - @Override - protected void finalize() - { - stop(); - } - - /** - * Class that will control buffer-reuse - */ - private class ReuseBuffersController - { - private volatile long bufferReuseLastTime = System.currentTimeMillis(); - - /** - * This queue is fed by {@link org.apache.activemq.artemis.core.journal.impl.AIOSequentialFileFactory.ReuseBuffersController.LocalBufferCallback} - * which is called directly by NIO or NIO. On the case of the AIO this is almost called by the native layer as - * soon as the buffer is not being used any more and ready to be reused or GCed - */ - private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>(); - - private boolean stopped = false; - - final BufferCallback callback = new LocalBufferCallback(); - - public ByteBuffer newBuffer(final int size) - { - // if a new buffer wasn't requested in 10 seconds, we clear the queue - // This is being done this way as we don't need another Timeout Thread - // just to cleanup this - if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000) - { - if (AIOSequentialFileFactory.trace) - { - AIOSequentialFileFactory.trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + - " elements"); - } - - bufferReuseLastTime = System.currentTimeMillis(); - - clearPoll(); - } - - // if a buffer is bigger than the configured-bufferSize, we just create a new - // buffer. - if (size > bufferSize) - { - return AsynchronousFileImpl.newBuffer(size); - } - else - { - // We need to allocate buffers following the rules of the storage - // being used (AIO/NIO) - int alignedSize = calculateBlockSize(size); - - // Try getting a buffer from the queue... - ByteBuffer buffer = reuseBuffersQueue.poll(); - - if (buffer == null) - { - // if empty create a new one. - buffer = AsynchronousFileImpl.newBuffer(bufferSize); - - buffer.limit(alignedSize); - } - else - { - clearBuffer(buffer); - - // set the limit of the buffer to the bufferSize being required - buffer.limit(alignedSize); - } - - buffer.rewind(); - - return buffer; - } - } - - public synchronized void stop() - { - stopped = true; - clearPoll(); - } - - public synchronized void clearPoll() - { - ByteBuffer reusedBuffer; - - while ((reusedBuffer = reuseBuffersQueue.poll()) != null) - { - releaseBuffer(reusedBuffer); - } - } - - private class LocalBufferCallback implements BufferCallback - { - public void bufferDone(final ByteBuffer buffer) - { - synchronized (ReuseBuffersController.this) - { - - if (stopped) - { - releaseBuffer(buffer); - } - else - { - bufferReuseLastTime = System.currentTimeMillis(); - - // If a buffer has any other than the configured bufferSize, the buffer - // will be just sent to GC - if (buffer.capacity() == bufferSize) - { - reuseBuffersQueue.offer(buffer); - } - else - { - releaseBuffer(buffer); - } - } - } - } - } - } - - private static ClassLoader getThisClassLoader() - { - return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() - { - public ClassLoader run() - { - return AIOSequentialFileFactory.class.getClassLoader(); - } - }); - - } - - @Override - public String toString() - { - return AIOSequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped + - "):" + super.toString(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java index e21b046..b36a0c4 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java @@ -24,8 +24,8 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; @@ -87,7 +87,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback final List<Pair<String, String>> renames) throws Exception { - SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1); + SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL); try { @@ -182,7 +182,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback // To Fix the size of the file writingChannel.writerIndex(writingChannel.capacity()); - sequentialFile.writeInternal(writingChannel.toByteBuffer()); + sequentialFile.writeDirect(writingChannel.toByteBuffer(), true); sequentialFile.close(); newDataFiles.add(currentFile); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java deleted file mode 100644 index a4eed58..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; -import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; - -public abstract class AbstractSequentialFile implements SequentialFile -{ - - private File file; - - protected final File directory; - - protected final SequentialFileFactory factory; - - protected long fileSize = 0; - - protected final AtomicLong position = new AtomicLong(0); - - protected TimedBuffer timedBuffer; - - /** - * Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class. - * This is the class returned to the factory when the file is being activated. - */ - protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver(); - - /** - * Used for asynchronous writes - */ - protected final Executor writerExecutor; - - /** - * @param file - * @param directory - */ - public AbstractSequentialFile(final File directory, - final String file, - final SequentialFileFactory factory, - final Executor writerExecutor) - { - super(); - this.file = new File(directory, file); - this.directory = directory; - this.factory = factory; - this.writerExecutor = writerExecutor; - } - - // Public -------------------------------------------------------- - - public final boolean exists() - { - return file.exists(); - } - - public final String getFileName() - { - return file.getName(); - } - - public final void delete() throws IOException, InterruptedException, ActiveMQException - { - if (isOpen()) - { - close(); - } - - if (file.exists() && !file.delete()) - { - ActiveMQJournalLogger.LOGGER.errorDeletingFile(this); - } - } - - public void copyTo(SequentialFile newFileName) throws Exception - { - try - { - ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + newFileName); - if (!newFileName.isOpen()) - { - newFileName.open(); - } - - if (!isOpen()) - { - this.open(); - } - - - ByteBuffer buffer = ByteBuffer.allocate(10 * 1024); - - for (;;) - { - buffer.rewind(); - int size = this.read(buffer); - newFileName.writeDirect(buffer, false); - if (size < 10 * 1024) - { - break; - } - } - newFileName.close(); - this.close(); - } - catch (IOException e) - { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - throw e; - } - } - - /** - * @throws IOException only declare exception due to signature. Sub-class needs it. - */ - @Override - public void position(final long pos) throws IOException - { - position.set(pos); - } - - public long position() - { - return position.get(); - } - - public final void renameTo(final String newFileName) throws IOException, InterruptedException, - ActiveMQException - { - try - { - close(); - } - catch (IOException e) - { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - throw e; - } - - File newFile = new File(directory + "/" + newFileName); - - if (!file.equals(newFile)) - { - if (!file.renameTo(newFile)) - { - throw ActiveMQJournalBundle.BUNDLE.ioRenameFileError(file.getName(), newFileName); - } - file = newFile; - } - } - - /** - * @throws IOException we declare throwing IOException because sub-classes need to do it - * @throws ActiveMQException - */ - public synchronized void close() throws IOException, InterruptedException, ActiveMQException - { - final CountDownLatch donelatch = new CountDownLatch(1); - - if (writerExecutor != null) - { - writerExecutor.execute(new Runnable() - { - public void run() - { - donelatch.countDown(); - } - }); - - while (!donelatch.await(60, TimeUnit.SECONDS)) - { - ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName()); - } - } - } - - public final boolean fits(final int size) - { - if (timedBuffer == null) - { - return position.get() + size <= fileSize; - } - else - { - return timedBuffer.checkSize(size); - } - } - - public void setTimedBuffer(final TimedBuffer buffer) - { - if (timedBuffer != null) - { - timedBuffer.setObserver(null); - } - - timedBuffer = buffer; - - if (buffer != null) - { - buffer.setObserver(timedBufferObserver); - } - - } - - public void write(final ActiveMQBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException - { - if (timedBuffer != null) - { - bytes.setIndex(0, bytes.capacity()); - timedBuffer.addBytes(bytes, sync, callback); - } - else - { - ByteBuffer buffer = factory.newBuffer(bytes.capacity()); - buffer.put(bytes.toByteBuffer().array()); - buffer.rewind(); - writeDirect(buffer, sync, callback); - } - } - - public void write(final ActiveMQBuffer bytes, final boolean sync) throws IOException, InterruptedException, - ActiveMQException - { - if (sync) - { - SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); - - write(bytes, true, completion); - - completion.waitCompletion(); - } - else - { - write(bytes, false, DummyCallback.getInstance()); - } - } - - public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) - { - if (timedBuffer != null) - { - timedBuffer.addBytes(bytes, sync, callback); - } - else - { - ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize()); - - // If not using the TimedBuffer, a final copy is necessary - // Because AIO will need a specific Buffer - // And NIO will also need a whole buffer to perform the write - - ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer); - bytes.encode(outBuffer); - buffer.rewind(); - writeDirect(buffer, sync, callback); - } - } - - public void write(final EncodingSupport bytes, final boolean sync) throws InterruptedException, ActiveMQException - { - if (sync) - { - SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); - - write(bytes, true, completion); - - completion.waitCompletion(); - } - else - { - write(bytes, false, DummyCallback.getInstance()); - } - } - - protected File getFile() - { - return file; - } - - private static final class DelegateCallback implements IOAsyncTask - { - final List<IOAsyncTask> delegates; - - private DelegateCallback(final List<IOAsyncTask> delegates) - { - this.delegates = delegates; - } - - public void done() - { - for (IOAsyncTask callback : delegates) - { - try - { - callback.done(); - } - catch (Throwable e) - { - ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e); - } - } - } - - public void onError(final int errorCode, final String errorMessage) - { - for (IOAsyncTask callback : delegates) - { - try - { - callback.onError(errorCode, errorMessage); - } - catch (Throwable e) - { - ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e); - } - } - } - } - - protected ByteBuffer newBuffer(int size, int limit) - { - size = factory.calculateBlockSize(size); - limit = factory.calculateBlockSize(limit); - - ByteBuffer buffer = factory.newBuffer(size); - buffer.limit(limit); - return buffer; - } - - protected class LocalBufferObserver implements TimedBufferObserver - { - public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks) - { - buffer.flip(); - - if (buffer.limit() == 0) - { - factory.releaseBuffer(buffer); - } - else - { - writeDirect(buffer, requestedSync, new DelegateCallback(callbacks)); - } - } - - public ByteBuffer newBuffer(final int size, final int limit) - { - return AbstractSequentialFile.this.newBuffer(size, limit); - } - - public int getRemainingBytes() - { - if (fileSize - position.get() > Integer.MAX_VALUE) - { - return Integer.MAX_VALUE; - } - else - { - return (int)(fileSize - position.get()); - } - } - - @Override - public String toString() - { - return "TimedBufferObserver on file (" + getFile().getName() + ")"; - } - - } - - @Override - public File getJavaFile() - { - return getFile().getAbsoluteFile(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java deleted file mode 100644 index ec0ab4d..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; -import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; -import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; - -/** - * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories - */ -abstract class AbstractSequentialFileFactory implements SequentialFileFactory -{ - - // Timeout used to wait executors to shutdown - protected static final int EXECUTOR_TIMEOUT = 60; - - protected final File journalDir; - - protected final TimedBuffer timedBuffer; - - protected final int bufferSize; - - protected final long bufferTimeout; - - private final IOCriticalErrorListener critialErrorListener; - - /** - * Asynchronous writes need to be done at another executor. - * This needs to be done at NIO, or else we would have the callers thread blocking for the return. - * At AIO this is necessary as context switches on writes would fire flushes at the kernel. - * */ - protected ExecutorService writeExecutor; - - AbstractSequentialFileFactory(final File journalDir, - final boolean buffered, - final int bufferSize, - final int bufferTimeout, - final boolean logRates, - final IOCriticalErrorListener criticalErrorListener) - { - this.journalDir = journalDir; - - if (buffered) - { - timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates); - } - else - { - timedBuffer = null; - } - this.bufferSize = bufferSize; - this.bufferTimeout = bufferTimeout; - this.critialErrorListener = criticalErrorListener; - } - - public void stop() - { - if (timedBuffer != null) - { - timedBuffer.stop(); - } - - if (isSupportsCallbacks() && writeExecutor != null) - { - writeExecutor.shutdown(); - - try - { - if (!writeExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) - { - ActiveMQJournalLogger.LOGGER.timeoutOnWriterShutdown(new Exception("trace")); - } - } - catch (InterruptedException e) - { - throw new ActiveMQInterruptedException(e); - } - } - } - - @Override - public File getDirectory() - { - return journalDir; - } - - public void start() - { - if (timedBuffer != null) - { - timedBuffer.start(); - } - - if (isSupportsCallbacks()) - { - writeExecutor = Executors.newSingleThreadExecutor(new ActiveMQThreadFactory("ActiveMQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this), - true, - AbstractSequentialFileFactory.getThisClassLoader())); - } - - } - - @Override - public void onIOError(Exception exception, String message, SequentialFile file) - { - if (critialErrorListener != null) - { - critialErrorListener.onIOException(exception, message, file); - } - } - - @Override - public void activateBuffer(final SequentialFile file) - { - if (timedBuffer != null) - { - file.setTimedBuffer(timedBuffer); - } - } - - public void flush() - { - if (timedBuffer != null) - { - timedBuffer.flush(); - } - } - - public void deactivateBuffer() - { - if (timedBuffer != null) - { - // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer - timedBuffer.flush(); - timedBuffer.setObserver(null); - } - } - - public void releaseBuffer(final ByteBuffer buffer) - { - } - - /** - * Create the directory if it doesn't exist yet - */ - public void createDirs() throws Exception - { - boolean ok = journalDir.mkdirs(); - if (!ok) - { - throw new IOException("Failed to create directory " + journalDir); - } - } - - public List<String> listFiles(final String extension) throws Exception - { - FilenameFilter fnf = new FilenameFilter() - { - public boolean accept(final File file, final String name) - { - return name.endsWith("." + extension); - } - }; - - String[] fileNames = journalDir.list(fnf); - - if (fileNames == null) - { - return Collections.EMPTY_LIST; - } - - return Arrays.asList(fileNames); - } - - private static ClassLoader getThisClassLoader() - { - return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() - { - public ClassLoader run() - { - return AbstractSequentialFileFactory.class.getClassLoader(); - } - }); - - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java deleted file mode 100644 index 9c4c3d6..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; - -class DummyCallback extends SyncIOCompletion -{ - private static final DummyCallback instance = new DummyCallback(); - - public static DummyCallback getInstance() - { - return DummyCallback.instance; - } - - public void done() - { - } - - public void onError(final int errorCode, final String errorMessage) - { - ActiveMQJournalLogger.LOGGER.errorWritingData(new Exception(errorMessage), errorMessage, errorCode); - } - - @Override - public void waitCompletion() throws Exception - { - } - - @Override - public void storeLineUp() - { - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index a41e72f..5a0f11f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -32,7 +32,7 @@ import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java index d6a856a..1ba8f0b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.journal.impl; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index 5b0a1b8..1f657a2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -28,8 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX; @@ -64,7 +64,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ final List<String> newFiles, final List<Pair<String, String>> renameFile) throws Exception { - SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1); + SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL); if (controlFile.exists()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java index e3b1624..dcfc1a2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java @@ -16,7 +16,7 @@ */ package org.apache.activemq.artemis.core.journal.impl; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; public interface JournalFile { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java index 4438a96..7e96575 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java @@ -21,7 +21,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; public class JournalFileImpl implements JournalFile { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index 052dc25..268a23d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -31,8 +31,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; /** @@ -662,13 +662,13 @@ public class JournalFilesRepository String tmpFileName = fileName + ".tmp"; - SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO); + SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName); sequentialFile.open(1, false); if (init) { - sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER); + sequentialFile.fill(fileSize); JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 6f411eb..068e697 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -46,15 +46,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TestableJournal; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; @@ -293,7 +293,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final CountDownLatch latch = new CountDownLatch(numIts * 2); - class MyIOAsyncTask implements IOCompletion + class MyAIOCallback implements IOCompletion { public void done() { @@ -310,7 +310,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } - final MyIOAsyncTask task = new MyIOAsyncTask(); + final MyAIOCallback task = new MyAIOCallback(); final int recordSize = 1024; @@ -373,11 +373,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal for (String fileName : fileNames) { - SequentialFile file = fileFactory.createSequentialFile(fileName, filesRepository.getMaxAIO()); + SequentialFile file = fileFactory.createSequentialFile(fileName); if (file.size() >= SIZE_HEADER) { - file.open(1, false); + file.open(); try { @@ -2776,11 +2776,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final boolean completeTransaction, final boolean sync, final JournalTransaction tx, - final IOAsyncTask parameterCallback) throws Exception + final IOCallback parameterCallback) throws Exception { checkJournalIsLoaded(); - final IOAsyncTask callback; + final IOCallback callback; final int size = encoder.getEncodeSize(); @@ -2896,7 +2896,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal { for (String dataFile : dataFiles) { - SequentialFile file = fileFactory.createSequentialFile(dataFile, 1); + SequentialFile file = fileFactory.createSequentialFile(dataFile); if (file.exists()) { file.delete(); @@ -2905,7 +2905,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal for (String newFile : newFiles) { - SequentialFile file = fileFactory.createSequentialFile(newFile, 1); + SequentialFile file = fileFactory.createSequentialFile(newFile); if (file.exists()) { final String originalName = file.getFileName(); @@ -2916,8 +2916,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal for (Pair<String, String> rename : renames) { - SequentialFile fileTmp = fileFactory.createSequentialFile(rename.getA(), 1); - SequentialFile fileTo = fileFactory.createSequentialFile(rename.getB(), 1); + SequentialFile fileTmp = fileFactory.createSequentialFile(rename.getA()); + SequentialFile fileTo = fileFactory.createSequentialFile(rename.getB()); // We should do the rename only if the tmp file still exist, or else we could // delete a valid file depending on where the crash occurred during the control file delete if (fileTmp.exists()) @@ -2951,7 +2951,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal for (String fileToDelete : leftFiles) { ActiveMQJournalLogger.LOGGER.deletingOrphanedFile(fileToDelete); - SequentialFile file = fileFactory.createSequentialFile(fileToDelete, 1); + SequentialFile file = fileFactory.createSequentialFile(fileToDelete); file.delete(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java deleted file mode 100644 index 0dac7f2..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.concurrent.Executor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; -import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; -import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; -import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; - -public final class NIOSequentialFile extends AbstractSequentialFile -{ - private FileChannel channel; - - private RandomAccessFile rfile; - - /** - * The write semaphore here is only used when writing asynchronously - */ - private Semaphore maxIOSemaphore; - - private final int defaultMaxIO; - - private int maxIO; - - public NIOSequentialFile(final SequentialFileFactory factory, - final File directory, - final String file, - final int maxIO, - final Executor writerExecutor) - { - super(directory, file, factory, writerExecutor); - defaultMaxIO = maxIO; - } - - public int getAlignment() - { - return 1; - } - - public int calculateBlockStart(final int position) - { - return position; - } - - public synchronized boolean isOpen() - { - return channel != null; - } - - /** - * this.maxIO represents the default maxIO. - * Some operations while initializing files on the journal may require a different maxIO - */ - public synchronized void open() throws IOException - { - open(defaultMaxIO, true); - } - - public void open(final int maxIO, final boolean useExecutor) throws IOException - { - try - { - rfile = new RandomAccessFile(getFile(), "rw"); - - channel = rfile.getChannel(); - - fileSize = channel.size(); - } - catch (IOException e) - { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - throw e; - } - - if (writerExecutor != null && useExecutor) - { - maxIOSemaphore = new Semaphore(maxIO); - this.maxIO = maxIO; - } - } - - public void fill(final int position, final int size, final byte fillCharacter) throws IOException - { - ByteBuffer bb = ByteBuffer.allocate(size); - - for (int i = 0; i < size; i++) - { - bb.put(fillCharacter); - } - - bb.flip(); - - try - { - channel.position(position); - channel.write(bb); - channel.force(false); - channel.position(0); - } - catch (IOException e) - { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - throw e; - } - - fileSize = channel.size(); - } - - public synchronized void waitForClose() throws InterruptedException - { - while (isOpen()) - { - wait(); - } - } - - @Override - public synchronized void close() throws IOException, InterruptedException, ActiveMQException - { - super.close(); - - if (maxIOSemaphore != null) - { - while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) - { - ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName()); - } - } - - maxIOSemaphore = null; - try - { - if (channel != null) - { - channel.close(); - } - - if (rfile != null) - { - rfile.close(); - } - } - catch (IOException e) - { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - throw e; - } - channel = null; - - rfile = null; - - notifyAll(); - } - - public int read(final ByteBuffer bytes) throws Exception - { - return read(bytes, null); - } - - public synchronized int read(final ByteBuffer bytes, final IOAsyncTask callback) throws IOException, - ActiveMQIllegalStateException - { - try - { - if (channel == null) - { - throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel"); - } - int bytesRead = channel.read(bytes); - - if (callback != null) - { - callback.done(); - } - - bytes.flip(); - - return bytesRead; - } - catch (IOException e) - { - if (callback != null) - { - callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage()); - } - - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - - throw e; - } - } - - public void sync() throws IOException - { - if (channel != null) - { - try - { - channel.force(false); - } - catch (IOException e) - { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - throw e; - } - } - } - - public long size() throws IOException - { - if (channel == null) - { - return getFile().length(); - } - - try - { - return channel.size(); - } - catch (IOException e) - { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - throw e; - } - } - - @Override - public void position(final long pos) throws IOException - { - try - { - super.position(pos); - channel.position(pos); - } - catch (IOException e) - { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - throw e; - } - } - - @Override - public String toString() - { - return "NIOSequentialFile " + getFile(); - } - - public SequentialFile cloneFile() - { - return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor); - } - - public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) - { - if (callback == null) - { - throw new NullPointerException("callback parameter need to be set"); - } - - try - { - internalWrite(bytes, sync, callback); - } - catch (Exception e) - { - callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage()); - } - } - - public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception - { - internalWrite(bytes, sync, null); - } - - public void writeInternal(final ByteBuffer bytes) throws Exception - { - internalWrite(bytes, true, null); - } - - @Override - protected ByteBuffer newBuffer(int size, final int limit) - { - // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO - - size = limit; - - return super.newBuffer(size, limit); - } - - private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException, ActiveMQIOErrorException, InterruptedException - { - if (!isOpen()) - { - if (callback != null) - { - callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened"); - } - else - { - throw ActiveMQJournalBundle.BUNDLE.fileNotOpened(); - } - return; - } - - position.addAndGet(bytes.limit()); - - if (maxIOSemaphore == null || callback == null) - { - // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous - try - { - doInternalWrite(bytes, sync, callback); - } - catch (IOException e) - { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - } - } - else - { - // This is a flow control on writing, just like maxAIO on libaio - maxIOSemaphore.acquire(); - - writerExecutor.execute(new Runnable() - { - public void run() - { - try - { - try - { - doInternalWrite(bytes, sync, callback); - } - catch (IOException e) - { - ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this); - callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); - } - catch (Throwable e) - { - ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); - callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); - } - } - finally - { - maxIOSemaphore.release(); - } - } - }); - } - } - - /** - * @param bytes - * @param sync - * @param callback - * @throws IOException - * @throws Exception - */ - private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException - { - channel.write(bytes); - - if (sync) - { - sync(); - } - - if (callback != null) - { - callback.done(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java deleted file mode 100644 index e471928..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import java.io.File; -import java.lang.ref.WeakReference; -import java.nio.ByteBuffer; - -import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener; -import org.apache.activemq.artemis.core.journal.SequentialFile; - -public class NIOSequentialFileFactory extends AbstractSequentialFileFactory -{ - public NIOSequentialFileFactory(final File journalDir) - { - this(journalDir, null); - } - - public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener) - { - this(journalDir, - false, - JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, - JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, - false, - listener); - } - - public NIOSequentialFileFactory(final File journalDir, final boolean buffered) - { - this(journalDir, buffered, null); - } - - public NIOSequentialFileFactory(final File journalDir, - final boolean buffered, - final IOCriticalErrorListener listener) - { - this(journalDir, - buffered, - JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, - JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, - false, - listener); - } - - public NIOSequentialFileFactory(final File journalDir, - final boolean buffered, - final int bufferSize, - final int bufferTimeout, - final boolean logRates) - { - this(journalDir, buffered, bufferSize, bufferTimeout, logRates, null); - } - - public NIOSequentialFileFactory(final File journalDir, - final boolean buffered, - final int bufferSize, - final int bufferTimeout, - final boolean logRates, - final IOCriticalErrorListener listener) - { - super(journalDir, buffered, bufferSize, bufferTimeout, logRates, listener); - } - - public SequentialFile createSequentialFile(final String fileName, int maxIO) - { - if (maxIO < 1) - { - // A single threaded IO - maxIO = 1; - } - - return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); - } - - public boolean isSupportsCallbacks() - { - return timedBuffer != null; - } - - - public ByteBuffer allocateDirectBuffer(final int size) - { - // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467 - ByteBuffer buffer2 = null; - try - { - buffer2 = ByteBuffer.allocateDirect(size); - } - catch (OutOfMemoryError error) - { - // This is a workaround for the way the JDK will deal with native buffers. - // the main portion is outside of the VM heap - // and the JDK will not have any reference about it to take GC into account - // so we force a GC and try again. - WeakReference<Object> obj = new WeakReference<Object>(new Object()); - try - { - long timeout = System.currentTimeMillis() + 5000; - while (System.currentTimeMillis() > timeout && obj.get() != null) - { - System.gc(); - Thread.sleep(100); - } - } - catch (InterruptedException e) - { - } - - buffer2 = ByteBuffer.allocateDirect(size); - - } - return buffer2; - } - - public void releaseDirectBuffer(ByteBuffer buffer) - { - // nothing we can do on this case. we can just have good faith on GC - } - - public ByteBuffer newBuffer(final int size) - { - return ByteBuffer.allocate(size); - } - - public void clearBuffer(final ByteBuffer buffer) - { - final int limit = buffer.limit(); - buffer.rewind(); - - for (int i = 0; i < limit; i++) - { - buffer.put((byte)0); - } - - buffer.rewind(); - } - - public ByteBuffer wrapBuffer(final byte[] bytes) - { - return ByteBuffer.wrap(bytes); - } - - public int getAlignment() - { - return 1; - } - - public int calculateBlockSize(final int bytes) - { - return bytes; - } - -}
