http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/AsynchronousFileImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/AsynchronousFileImpl.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/AsynchronousFileImpl.java new file mode 100644 index 0000000..9ee4b0a --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/AsynchronousFileImpl.java @@ -0,0 +1,819 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.asyncio.impl; + +import java.nio.ByteBuffer; +import java.nio.channels.FileLock; +import java.util.PriorityQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.core.asyncio.AIOCallback; +import org.apache.activemq6.core.asyncio.AsynchronousFile; +import org.apache.activemq6.core.asyncio.BufferCallback; +import org.apache.activemq6.core.asyncio.IOExceptionListener; +import org.apache.activemq6.core.libaio.Native; +import org.apache.activemq6.journal.HornetQJournalLogger; +import org.apache.activemq6.utils.ReusableLatch; + +/** + * AsynchronousFile implementation + * + * @author [email protected] + * Warning: Case you refactor the name or the package of this class + * You need to make sure you also rename the C++ native calls + */ +public class AsynchronousFileImpl implements AsynchronousFile +{ + // Static ---------------------------------------------------------------------------- + + private static final AtomicInteger totalMaxIO = new AtomicInteger(0); + + private static boolean loaded = false; + + /** + * This definition needs to match Version.h on the native sources. + * <p/> + * Or else the native module won't be loaded because of version mismatches + */ + private static final int EXPECTED_NATIVE_VERSION = 52; + + /** + * Used to determine the next writing sequence + */ + private final AtomicLong nextWritingSequence = new AtomicLong(0); + + /** + * Used to determine the next writing sequence. + * This is accessed from a single thread (the Poller Thread) + */ + private long nextReadSequence = 0; + + /** + * AIO can't guarantee ordering over callbacks. + * <p/> + * We use this {@link PriorityQueue} to hold values until they are in order + */ + private final PriorityQueue<CallbackHolder> pendingCallbacks = new PriorityQueue<CallbackHolder>(); + + public static void addMax(final int io) + { + AsynchronousFileImpl.totalMaxIO.addAndGet(io); + } + + /** + * For test purposes + */ + public static int getTotalMaxIO() + { + return AsynchronousFileImpl.totalMaxIO.get(); + } + + public static void resetMaxAIO() + { + AsynchronousFileImpl.totalMaxIO.set(0); + } + + public static int openFile(String fileName) + { + return Native.openFile(fileName); + } + + public static void closeFile(int handle) + { + Native.closeFile(handle); + } + + public static void destroyBuffer(ByteBuffer buffer) + { + Native.destroyBuffer(buffer); + } + + private static boolean loadLibrary(final String name) + { + try + { + HornetQJournalLogger.LOGGER.trace(name + " being loaded"); + System.loadLibrary(name); + if (Native.getNativeVersion() != AsynchronousFileImpl.EXPECTED_NATIVE_VERSION) + { + HornetQJournalLogger.LOGGER.incompatibleNativeLibrary(); + return false; + } + else + { + return true; + } + } + catch (Throwable e) + { + HornetQJournalLogger.LOGGER.debug(name + " -> error loading the native library", e); + return false; + } + + } + + static + { + String[] libraries = new String[]{"HornetQAIO", "HornetQAIO64", "HornetQAIO32", "HornetQAIO_ia64"}; + + for (String library : libraries) + { + if (AsynchronousFileImpl.loadLibrary(library)) + { + AsynchronousFileImpl.loaded = true; + break; + } + else + { + HornetQJournalLogger.LOGGER.debug("Library " + library + " not found!"); + } + } + + if (!AsynchronousFileImpl.loaded) + { + HornetQJournalLogger.LOGGER.debug("Couldn't locate LibAIO Wrapper"); + } + } + + public static boolean isLoaded() + { + return AsynchronousFileImpl.loaded; + } + + // Attributes ------------------------------------------------------------------------ + + private boolean opened = false; + + private String fileName; + + /** + * Used while inside the callbackDone and callbackError + */ + private final Lock callbackLock = new ReentrantLock(); + + private final ReusableLatch pollerLatch = new ReusableLatch(); + + private volatile Runnable poller; + + private int maxIO; + + private final Lock writeLock = new ReentrantReadWriteLock().writeLock(); + + private final ReusableLatch pendingWrites = new ReusableLatch(); + + private Semaphore maxIOSemaphore; + + private BufferCallback bufferCallback; + + /** + * A callback for IO errors when they happen + */ + private final IOExceptionListener ioExceptionListener; + + /** + * Warning: Beware of the C++ pointer! It will bite you! :-) + */ + private ByteBuffer handler; + + // A context switch on AIO would make it to synchronize the disk before + // switching to the new thread, what would cause + // serious performance problems. Because of that we make all the writes on + // AIO using a single thread. + private final Executor writeExecutor; + + private final Executor pollerExecutor; + + // AsynchronousFile implementation --------------------------------------------------- + + /** + * @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations + * @param pollerExecutor The thread pool that will initialize poller handlers + */ + public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor, final IOExceptionListener ioExceptionListener) + { + this.writeExecutor = writeExecutor; + this.pollerExecutor = pollerExecutor; + this.ioExceptionListener = ioExceptionListener; + } + + public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor) + { + this(writeExecutor, pollerExecutor, null); + } + + public void open(final String fileName1, final int maxIOArgument) throws HornetQException + { + writeLock.lock(); + + try + { + if (opened) + { + throw new IllegalStateException("AsynchronousFile is already opened"); + } + + this.maxIO = maxIOArgument; + maxIOSemaphore = new Semaphore(this.maxIO); + + this.fileName = fileName1; + + try + { + handler = Native.init(AsynchronousFileImpl.class, fileName1, this.maxIO, HornetQJournalLogger.LOGGER); + } + catch (HornetQException e) + { + HornetQException ex = null; + if (e.getType() == HornetQExceptionType.NATIVE_ERROR_CANT_INITIALIZE_AIO) + { + ex = new HornetQException(e.getType(), + "Can't initialize AIO. Currently AIO in use = " + AsynchronousFileImpl.totalMaxIO.get() + + ", trying to allocate more " + + maxIOArgument, + e); + } + else + { + ex = e; + } + throw ex; + } + opened = true; + AsynchronousFileImpl.addMax(this.maxIO); + nextWritingSequence.set(0); + nextReadSequence = 0; + } + finally + { + writeLock.unlock(); + } + } + + public void close() throws InterruptedException, HornetQException + { + checkOpened(); + + writeLock.lock(); + + try + { + + while (!pendingWrites.await(60000)) + { + HornetQJournalLogger.LOGGER.couldNotGetLock(fileName); + } + + while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) + { + HornetQJournalLogger.LOGGER.couldNotGetLock(fileName); + } + + maxIOSemaphore = null; + if (poller != null) + { + stopPoller(); + } + + if (handler != null) + { + Native.closeInternal(handler); + AsynchronousFileImpl.addMax(-maxIO); + } + opened = false; + handler = null; + } + finally + { + writeLock.unlock(); + } + } + + + public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException + { + try + { + Native.writeInternal(handler, positionToWrite, size, bytes); + } + catch (HornetQException e) + { + fireExceptionListener(e.getType().getCode(), e.getMessage()); + throw e; + } + if (bufferCallback != null) + { + bufferCallback.bufferDone(bytes); + } + } + + + public void write(final long position, + final long size, + final ByteBuffer directByteBuffer, + final AIOCallback aioCallback) + { + if (aioCallback == null) + { + throw new NullPointerException("Null Callback"); + } + + checkOpened(); + if (poller == null) + { + startPoller(); + } + + pendingWrites.countUp(); + + if (writeExecutor != null) + { + maxIOSemaphore.acquireUninterruptibly(); + + writeExecutor.execute(new Runnable() + { + public void run() + { + long sequence = nextWritingSequence.getAndIncrement(); + + try + { + Native.write(AsynchronousFileImpl.this, handler, sequence, position, size, directByteBuffer, aioCallback); + } + catch (HornetQException e) + { + callbackError(aioCallback, sequence, directByteBuffer, e.getType().getCode(), e.getMessage()); + } + catch (RuntimeException e) + { + callbackError(aioCallback, + sequence, + directByteBuffer, + HornetQExceptionType.INTERNAL_ERROR.getCode(), + e.getMessage()); + } + } + }); + } + else + { + maxIOSemaphore.acquireUninterruptibly(); + + long sequence = nextWritingSequence.getAndIncrement(); + + try + { + Native.write(this, handler, sequence, position, size, directByteBuffer, aioCallback); + } + catch (HornetQException e) + { + callbackError(aioCallback, sequence, directByteBuffer, e.getType().getCode(), e.getMessage()); + } + catch (RuntimeException e) + { + callbackError(aioCallback, sequence, directByteBuffer, HornetQExceptionType.INTERNAL_ERROR.getCode(), e.getMessage()); + } + } + + } + + public void read(final long position, + final long size, + final ByteBuffer directByteBuffer, + final AIOCallback aioPackage) throws HornetQException + { + checkOpened(); + if (poller == null) + { + startPoller(); + } + pendingWrites.countUp(); + maxIOSemaphore.acquireUninterruptibly(); + try + { + Native.read(this, handler, position, size, directByteBuffer, aioPackage); + } + catch (HornetQException e) + { + // Release only if an exception happened + maxIOSemaphore.release(); + pendingWrites.countDown(); + throw e; + } + catch (RuntimeException e) + { + // Release only if an exception happened + maxIOSemaphore.release(); + pendingWrites.countDown(); + throw e; + } + } + + public long size() throws HornetQException + { + checkOpened(); + return Native.size0(handler); + } + + public void fill(final long position, final int blocks, final long size, final byte fillChar) throws HornetQException + { + checkOpened(); + try + { + Native.fill(handler, position, blocks, size, fillChar); + } + catch (HornetQException e) + { + fireExceptionListener(e.getType().getCode(), e.getMessage()); + throw e; + } + } + + public int getBlockSize() + { + return 512; + } + + /** + * This needs to be synchronized because of + * http://bugs.sun.com/view_bug.do?bug_id=6791815 + * http://mail.openjdk.java.net/pipermail/hotspot-runtime-dev/2009-January/000386.html + */ + public static synchronized ByteBuffer newBuffer(final int size) + { + if (size % 512 != 0) + { + throw new RuntimeException("Buffer size needs to be aligned to 512"); + } + + return Native.newNativeBuffer(size); + } + + public void setBufferCallback(final BufferCallback callback) + { + bufferCallback = callback; + } + + /** + * Return the JNI handler used on C++ + */ + public ByteBuffer getHandler() + { + return handler; + } + + public static void clearBuffer(final ByteBuffer buffer) + { + Native.resetBuffer(buffer, buffer.limit()); + buffer.position(0); + } + + // Protected ------------------------------------------------------------------------- + + @Override + protected void finalize() + { + if (opened) + { + HornetQJournalLogger.LOGGER.fileFinalizedWhileOpen(fileName); + } + } + + // Private --------------------------------------------------------------------------- + + private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer) + { + maxIOSemaphore.release(); + + pendingWrites.countDown(); + + callbackLock.lock(); + + try + { + + if (sequence == -1) + { + callback.done(); + } + else + { + if (sequence == nextReadSequence) + { + nextReadSequence++; + callback.done(); + flushCallbacks(); + } + else + { + pendingCallbacks.add(new CallbackHolder(sequence, callback)); + } + } + + // The buffer is not sent on callback for read operations + if (bufferCallback != null && buffer != null) + { + bufferCallback.bufferDone(buffer); + } + } + finally + { + callbackLock.unlock(); + } + } + + private void flushCallbacks() + { + while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence == nextReadSequence) + { + CallbackHolder holder = pendingCallbacks.poll(); + if (holder.isError()) + { + ErrorCallback error = (ErrorCallback) holder; + holder.callback.onError(error.errorCode, error.message); + } + else + { + holder.callback.done(); + } + nextReadSequence++; + } + } + + // Called by the JNI layer.. just ignore the + // warning + private void callbackError(final AIOCallback callback, + final long sequence, + final ByteBuffer buffer, + final int errorCode, + final String errorMessage) + { + HornetQJournalLogger.LOGGER.callbackError(errorMessage); + + fireExceptionListener(errorCode, errorMessage); + + maxIOSemaphore.release(); + + pendingWrites.countDown(); + + callbackLock.lock(); + + try + { + if (sequence == -1) + { + callback.onError(errorCode, errorMessage); + } + else + { + if (sequence == nextReadSequence) + { + nextReadSequence++; + callback.onError(errorCode, errorMessage); + flushCallbacks(); + } + else + { + pendingCallbacks.add(new ErrorCallback(sequence, callback, errorCode, errorMessage)); + } + } + } + finally + { + callbackLock.unlock(); + } + + // The buffer is not sent on callback for read operations + if (bufferCallback != null && buffer != null) + { + bufferCallback.bufferDone(buffer); + } + } + + /** + * This is called by the native layer + * + * @param errorCode + * @param errorMessage + */ + private void fireExceptionListener(final int errorCode, final String errorMessage) + { + HornetQJournalLogger.LOGGER.ioError(errorCode, errorMessage); + if (ioExceptionListener != null) + { + ioExceptionListener.onIOException(HornetQExceptionType.getType(errorCode).createException(errorMessage), errorMessage); + } + } + + private void pollEvents() + { + if (!opened) + { + return; + } + Native.internalPollEvents(handler); + } + + private void startPoller() + { + writeLock.lock(); + + try + { + + if (poller == null) + { + pollerLatch.countUp(); + poller = new PollerRunnable(); + try + { + pollerExecutor.execute(poller); + } + catch (Exception ex) + { + HornetQJournalLogger.LOGGER.errorStartingPoller(ex); + } + } + } + finally + { + writeLock.unlock(); + } + } + + private void checkOpened() + { + if (!opened) + { + throw new RuntimeException("File is not opened"); + } + } + + /** + * @throws HornetQException + * @throws InterruptedException + */ + private void stopPoller() throws HornetQException, InterruptedException + { + Native.stopPoller(handler); + // We need to make sure we won't call close until Poller is + // completely done, or we might get beautiful GPFs + pollerLatch.await(); + } + + public static FileLock lock(int handle) + { + if (Native.flock(handle)) + { + return new HornetQFileLock(handle); + } + else + { + return null; + } + } + + // Native ---------------------------------------------------------------------------- + + + /** + * Explicitly adding a compare to clause that returns 0 for at least the same object. + * <p/> + * If {@link Comparable#compareTo(Object)} does not return 0 -for at least the same object- some + * Collection classes methods will fail (example {@link PriorityQueue#remove(Object)}. If it + * returns 0, then {@link #equals(Object)} must return {@code true} for the exact same cases, + * otherwise we will get compatibility problems between Java5 and Java6. + */ + private static class CallbackHolder implements Comparable<CallbackHolder> + { + final long sequence; + + final AIOCallback callback; + + public boolean isError() + { + return false; + } + + public CallbackHolder(final long sequence, final AIOCallback callback) + { + this.sequence = sequence; + this.callback = callback; + } + + public int compareTo(final CallbackHolder o) + { + // It shouldn't be equals in any case + if (this == o) + return 0; + if (sequence <= o.sequence) + { + return -1; + } + else + { + return 1; + } + } + + /** + * See {@link CallbackHolder}. + */ + @Override + public int hashCode() + { + return super.hashCode(); + } + + /** + * See {@link CallbackHolder}. + */ + @Override + public boolean equals(Object obj) + { + return super.equals(obj); + } + } + + private static final class ErrorCallback extends CallbackHolder + { + final int errorCode; + + final String message; + + @Override + public boolean isError() + { + return true; + } + + public ErrorCallback(final long sequence, final AIOCallback callback, final int errorCode, final String message) + { + super(sequence, callback); + + this.errorCode = errorCode; + + this.message = message; + } + + /** + * See {@link CallbackHolder}. + */ + @Override + public int hashCode() + { + return super.hashCode(); + } + + /** + * See {@link CallbackHolder}. + */ + @Override + public boolean equals(Object obj) + { + return super.equals(obj); + } + } + + private class PollerRunnable implements Runnable + { + PollerRunnable() + { + } + + public void run() + { + try + { + pollEvents(); + } + finally + { + // This gives us extra protection in cases of interruption + // Case the poller thread is interrupted, this will allow us to + // restart the thread when required + poller = null; + pollerLatch.countDown(); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/HornetQFileLock.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/HornetQFileLock.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/HornetQFileLock.java new file mode 100644 index 0000000..ba2514a --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/HornetQFileLock.java @@ -0,0 +1,47 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.asyncio.impl; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +import org.apache.activemq6.core.libaio.Native; + +/** + * A HornetQFileLock + * @author clebertsuconic + */ +public class HornetQFileLock extends FileLock +{ + + private final int handle; + + protected HornetQFileLock(final int handle) + { + super((FileChannel)null, 0, 0, false); + this.handle = handle; + } + + @Override + public boolean isValid() + { + return true; + } + + @Override + public void release() throws IOException + { + Native.closeFile(handle); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/EncodingSupport.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/EncodingSupport.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/EncodingSupport.java new file mode 100644 index 0000000..f531434 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/EncodingSupport.java @@ -0,0 +1,33 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +import org.apache.activemq6.api.core.HornetQBuffer; + +/** + * + * This interface provides encoding support for the Journal. + * + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public interface EncodingSupport +{ + int getEncodeSize(); + + void encode(HornetQBuffer buffer); + + void decode(HornetQBuffer buffer); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOAsyncTask.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOAsyncTask.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOAsyncTask.java new file mode 100644 index 0000000..e130481 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOAsyncTask.java @@ -0,0 +1,27 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +import org.apache.activemq6.core.asyncio.AIOCallback; + +/** + * + * This class is just a direct extension of AIOCallback. + * Just to avoid the direct dependency of org.apache.activemq6.core.asynciio.AIOCallback from the journal. + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public interface IOAsyncTask extends AIOCallback +{ +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCompletion.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCompletion.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCompletion.java new file mode 100644 index 0000000..f1d5915 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCompletion.java @@ -0,0 +1,25 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +/** + * A IOCompletion + * + * @author <mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public interface IOCompletion extends IOAsyncTask +{ + void storeLineUp(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCriticalErrorListener.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCriticalErrorListener.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCriticalErrorListener.java new file mode 100644 index 0000000..fba45e8 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCriticalErrorListener.java @@ -0,0 +1,25 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +/** + * A IOCriticalErrorListener + * + * @author clebert + * + * + */ +public interface IOCriticalErrorListener +{ + void onIOException(Exception code, String message, SequentialFile file); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/Journal.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/Journal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/Journal.java new file mode 100644 index 0000000..85c65e9 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/Journal.java @@ -0,0 +1,227 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +import java.util.List; +import java.util.Map; + +import org.apache.activemq6.core.journal.impl.JournalFile; +import org.apache.activemq6.core.server.HornetQComponent; + +/** + * Most methods on the journal provide a blocking version where you select the sync mode and a non + * blocking mode where you pass a completion callback as a parameter. + * <p> + * Notice also that even on the callback methods it's possible to pass the sync mode. That will only + * make sense on the NIO operations. + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * @see org.apache.activemq6.utils.IDGenerator + */ +public interface Journal extends HornetQComponent +{ + enum JournalState + { + STOPPED, + /** + * The journal has some fields initialized and services running. But it is not fully + * operational. See {@link JournalState#LOADED}. + */ + STARTED, + /** + * When a replicating server is still not synchronized with its live. So if the live stops, + * the backup may not fail-over and will stop as well. + */ + SYNCING, + /** + * Journal is being used by a replicating server which is up-to-date with its live. That means + * that if the live stops, the backup can fail-over. + */ + SYNCING_UP_TO_DATE, + /** + * The journal is fully operational. This is the state the journal should be when its server + * is live. + */ + LOADED; + } + + // Non transactional operations + + void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; + + void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception; + + void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception; + + void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; + + void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception; + + void appendUpdateRecord(long id, + byte recordType, + EncodingSupport record, + boolean sync, + IOCompletion completionCallback) throws Exception; + + void appendDeleteRecord(long id, boolean sync) throws Exception; + + void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception; + + // Transactional operations + + void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception; + + void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception; + + void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception; + + void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception; + + void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception; + + void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception; + + void appendDeleteRecordTransactional(long txID, long id) throws Exception; + + void appendCommitRecord(long txID, boolean sync) throws Exception; + + void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception; + + /** + * @param txID + * @param sync + * @param callback + * @param lineUpContext if appendCommitRecord should call a storeLineUp. This is because the + * caller may have already taken into account + * @throws Exception + */ + void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception; + + /** + * + * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction + * back to a state it could be committed. </p> + * + * <p> transactionData allows you to store any other supporting user-data related to the transaction</p> + * + * @param txID + * @param transactionData - extra user data for the prepare + * @throws Exception + */ + void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception; + + void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception; + + void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception; + + void appendRollbackRecord(long txID, boolean sync) throws Exception; + + void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception; + + // Load + + JournalLoadInformation load(LoaderCallback reloadManager) throws Exception; + + /** + * Load internal data structures and not expose any data. This is only useful if you're using the + * journal but not interested on the current data. Useful in situations where the journal is + * being replicated, copied... etc. + */ + JournalLoadInformation loadInternalOnly() throws Exception; + + /** + * Load internal data structures, and remain waiting for synchronization to complete. + * @param state the current state of the journal, this parameter ensures consistency. + */ + JournalLoadInformation loadSyncOnly(JournalState state) throws Exception; + + void lineUpContext(IOCompletion callback); + + JournalLoadInformation load(List<RecordInfo> committedRecords, + List<PreparedTransactionInfo> preparedTransactions, + TransactionFailureCallback transactionFailure) throws Exception; + + int getAlignment() throws Exception; + + int getNumberOfRecords(); + + int getUserVersion(); + + void perfBlast(int pages); + + void runDirectJournalBlast() throws Exception; + + /** + * Reserves journal file IDs, creates the necessary files for synchronization, and places + * references to these (reserved for sync) files in the map. + * <p> + * During the synchronization between a live server and backup, we reserve in the backup the + * journal file IDs used in the live server. This call also makes sure the files are created + * empty without any kind of headers added. + * @param fileIds IDs to reserve for synchronization + * @return map to be filled with id and journal file pairs for <b>synchronization</b>. + * @throws Exception + */ + Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception; + + /** + * Write lock the Journal and write lock the compacting process. Necessary only during + * replication for backup synchronization. + */ + void synchronizationLock(); + + /** + * Unlock the Journal and the compacting process. + * @see Journal#synchronizationLock() + */ + void synchronizationUnlock(); + + /** + * Force the usage of a new {@link JournalFile}. + * @throws Exception + */ + void forceMoveNextFile() throws Exception; + + /** + * Returns the {@link JournalFile}s in use. + * @return array with all {@link JournalFile}s in use + */ + JournalFile[] getDataFiles(); + + SequentialFileFactory getFileFactory(); + + int getFileSize(); + + /** + * This method will start compact using the compactorExecutor and block up to timeout seconds + * @param timeout the timeout in seconds or block forever if <= 0 + * @throws Exception + */ + void scheduleCompactAndBlock(int timeout) throws Exception; + + /** + * Stops any operation that may delete or modify old (stale) data. + * <p> + * Meant to be used during synchronization of data between a live server and its replicating + * (remote) backup. Old files must not be compacted or deleted during synchronization. + */ + void replicationSyncPreserveOldFiles(); + + /** + * Restarts file reclaim and compacting on the journal. + * <p> + * Meant to be used to revert the effect of {@link #replicationSyncPreserveOldFiles()}. it should + * only be called once the synchronization of the backup and live servers is completed. + */ + void replicationSyncFinished(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/JournalLoadInformation.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/JournalLoadInformation.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/JournalLoadInformation.java new file mode 100644 index 0000000..ed1b18d --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/JournalLoadInformation.java @@ -0,0 +1,116 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +/** + * This is a POJO containing information about the journal during load time. + * @author <mailto:[email protected]">Clebert Suconic</a> + */ +public class JournalLoadInformation +{ + + private int numberOfRecords = 0; + + private long maxID = -1; + + public JournalLoadInformation() + { + super(); + } + + /** + * @param numberOfRecords + * @param maxID + */ + public JournalLoadInformation(final int numberOfRecords, final long maxID) + { + super(); + this.numberOfRecords = numberOfRecords; + this.maxID = maxID; + } + + /** + * @return the numberOfRecords + */ + public int getNumberOfRecords() + { + return numberOfRecords; + } + + /** + * @param numberOfRecords the numberOfRecords to set + */ + public void setNumberOfRecords(final int numberOfRecords) + { + this.numberOfRecords = numberOfRecords; + } + + /** + * @return the maxID + */ + public long getMaxID() + { + return maxID; + } + + /** + * @param maxID the maxID to set + */ + public void setMaxID(final long maxID) + { + this.maxID = maxID; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + (int)(maxID ^ maxID >>> 32); + result = prime * result + numberOfRecords; + return result; + } + + @Override + public boolean equals(final Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null) + { + return false; + } + if (getClass() != obj.getClass()) + { + return false; + } + JournalLoadInformation other = (JournalLoadInformation)obj; + if (maxID != other.maxID) + { + return false; + } + if (numberOfRecords != other.numberOfRecords) + { + return false; + } + return true; + } + + @Override + public String toString() + { + return "JournalLoadInformation [maxID=" + maxID + ", numberOfRecords=" + numberOfRecords + "]"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/LoaderCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/LoaderCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/LoaderCallback.java new file mode 100644 index 0000000..3d1c156 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/LoaderCallback.java @@ -0,0 +1,29 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +/** + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public interface LoaderCallback extends TransactionFailureCallback +{ + void addPreparedTransaction(PreparedTransactionInfo preparedTransaction); + + void addRecord(RecordInfo info); + + void deleteRecord(long id); + + void updateRecord(RecordInfo info); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/PreparedTransactionInfo.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/PreparedTransactionInfo.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/PreparedTransactionInfo.java new file mode 100644 index 0000000..9ae3eb0 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/PreparedTransactionInfo.java @@ -0,0 +1,42 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +import java.util.ArrayList; +import java.util.List; + +/** + * + * A PreparedTransactionInfo + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public class PreparedTransactionInfo +{ + public final long id; + + public final byte[] extraData; + + public final List<RecordInfo> records = new ArrayList<RecordInfo>(); + + public final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>(); + + public PreparedTransactionInfo(final long id, final byte[] extraData) + { + this.id = id; + + this.extraData = extraData; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/RecordInfo.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/RecordInfo.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/RecordInfo.java new file mode 100644 index 0000000..fb78b24 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/RecordInfo.java @@ -0,0 +1,86 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +/** + * A RecordInfo + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public class RecordInfo +{ + public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate, final short compactCount) + { + this.id = id; + + this.userRecordType = userRecordType; + + this.data = data; + + this.isUpdate = isUpdate; + + this.compactCount = compactCount; + } + + /** + * How many times this record was compacted (up to 7 times) + * After the record has reached 7 times, it will always be 7 + * As we only store up to 0x7 binary, as part of the recordID (binary 111) + */ + public final short compactCount; + + public final long id; + + public final byte userRecordType; + + public final byte[] data; + + public boolean isUpdate; + + public byte getUserRecordType() + { + return userRecordType; + } + + @Override + public int hashCode() + { + return (int) (id >>> 32 ^ id); + } + + @Override + public boolean equals(final Object other) + { + if (!(other instanceof RecordInfo)) + { + return false; + } + RecordInfo r = (RecordInfo) other; + + return r.id == id; + } + + @Override + public String toString() + { + return "RecordInfo (id=" + id + + ", userRecordType = " + + userRecordType + + ", data.length = " + + data.length + + ", isUpdate = " + + isUpdate; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFile.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFile.java new file mode 100644 index 0000000..94cdfb3 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFile.java @@ -0,0 +1,131 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.core.journal.impl.TimedBuffer; + +/** + * A SequentialFile + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public interface SequentialFile +{ + /* + * Creates the file if it doesn't already exist, then opens it + */ + void open() throws Exception; + + boolean isOpen(); + + boolean exists(); + + /** + * The maximum number of simultaneous writes accepted + * @param maxIO + * @throws Exception + */ + void open(int maxIO, boolean useExecutor) throws Exception; + + boolean fits(int size); + + int getAlignment() throws Exception; + + int calculateBlockStart(int position) throws Exception; + + String getFileName(); + + void fill(int position, int size, byte fillCharacter) throws Exception; + + void delete() throws IOException, InterruptedException, HornetQException; + + void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws Exception; + + void write(HornetQBuffer bytes, boolean sync) throws Exception; + + void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception; + + void write(EncodingSupport bytes, boolean sync) throws Exception; + + /** + * Write directly to the file without using any buffer + * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or + * NIO). To be safe, use a buffer from the corresponding + * {@link SequentialFileFactory#newBuffer(int)}. + */ + void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback); + + /** + * Write directly to the file without using any buffer + * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or + * NIO). To be safe, use a buffer from the corresponding + * {@link SequentialFileFactory#newBuffer(int)}. + */ + void writeDirect(ByteBuffer bytes, boolean sync) throws Exception; + + /** + * Write directly to the file. This is used by compacting and other places where we write a big + * buffer in a single shot. writeInternal should always block until the entire write is sync on + * disk. + * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or + * NIO). To be safe, use a buffer from the corresponding + * {@link SequentialFileFactory#newBuffer(int)}. + */ + void writeInternal(ByteBuffer bytes) throws Exception; + + /** + * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or + * NIO). To be safe, use a buffer from the corresponding + * {@link SequentialFileFactory#newBuffer(int)}. + */ + int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception; + + /** + * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or + * NIO). To be safe, use a buffer from the corresponding + * {@link SequentialFileFactory#newBuffer(int)}. + */ + int read(ByteBuffer bytes) throws Exception; + + void position(long pos) throws IOException; + + long position(); + + void close() throws Exception; + + void waitForClose() throws Exception; + + void sync() throws IOException; + + long size() throws Exception; + + void renameTo(String newFileName) throws Exception; + + SequentialFile cloneFile(); + + void copyTo(SequentialFile newFileName) throws Exception; + + void setTimedBuffer(TimedBuffer buffer); + + /** + * Returns a native File of the file underlying this sequential file. + */ + File getJavaFile(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFileFactory.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFileFactory.java new file mode 100644 index 0000000..0492641 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFileFactory.java @@ -0,0 +1,88 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + * + * A SequentialFileFactory + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public interface SequentialFileFactory +{ + SequentialFile createSequentialFile(String fileName, int maxIO); + + /** + * Lists files that end with the given extension. + * <p> + * This method inserts a ".' before the extension. + * @param extension + * @return + * @throws Exception + */ + List<String> listFiles(String extension) throws Exception; + + boolean isSupportsCallbacks(); + + /** The SequentialFile will call this method when a disk IO Error happens during the live phase. */ + void onIOError(Exception exception, String message, SequentialFile file); + + /** used for cases where you need direct buffer outside of the journal context. + * This is because the native layer has a method that can be reused in certain cases like paging */ + ByteBuffer allocateDirectBuffer(int size); + + /** used for cases where you need direct buffer outside of the journal context. + * This is because the native layer has a method that can be reused in certain cases like paging */ + void releaseDirectBuffer(ByteBuffer buffer); + + /** + * Note: You need to release the buffer if is used for reading operations. You don't need to do + * it if using writing operations (AIO Buffer Lister will take of writing operations) + * @param size + * @return the allocated ByteBuffer + */ + ByteBuffer newBuffer(int size); + + void releaseBuffer(ByteBuffer buffer); + + void activateBuffer(SequentialFile file); + + void deactivateBuffer(); + + // To be used in tests only + ByteBuffer wrapBuffer(byte[] bytes); + + int getAlignment(); + + int calculateBlockSize(int bytes); + + String getDirectory(); + + void clearBuffer(ByteBuffer buffer); + + void start(); + + void stop(); + + /** + * Creates the directory if it does not exist yet. + */ + void createDirs() throws Exception; + + void flush(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TestableJournal.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TestableJournal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TestableJournal.java new file mode 100644 index 0000000..8f578bb --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TestableJournal.java @@ -0,0 +1,70 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +import org.apache.activemq6.core.journal.impl.JournalFile; + +/** + * + * A TestableJournal + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public interface TestableJournal extends Journal +{ + int getDataFilesCount(); + + int getFreeFilesCount(); + + int getOpenedFilesCount(); + + int getIDMapSize(); + + String debug() throws Exception; + + void debugWait() throws Exception; + + int getFileSize(); + + int getMinFiles(); + + String getFilePrefix(); + + String getFileExtension(); + + int getMaxAIO(); + + void forceMoveNextFile() throws Exception; + + void setAutoReclaim(boolean autoReclaim); + + boolean isAutoReclaim(); + + void testCompact(); + + JournalFile getCurrentFile(); + + /** + * This method is called automatically when a new file is opened. + * <p> + * It will among other things, remove stale files and make them available for reuse. + * <p> + * This method locks the journal. + * @return true if it needs to re-check due to cleanup or other factors + */ + boolean checkReclaimStatus() throws Exception; + + JournalFile[] getDataFiles(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TransactionFailureCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TransactionFailureCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TransactionFailureCallback.java new file mode 100644 index 0000000..09d66d6 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TransactionFailureCallback.java @@ -0,0 +1,31 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal; + +import java.util.List; + +/** + * A Callback to receive information about bad transactions for extra cleanup required for broken transactions such as large messages. + * + * @author <mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public interface TransactionFailureCallback +{ + + /** To be used to inform about transactions without commit records. + * This could be used to remove extra resources associated with the transactions (such as external files received during the transaction) */ + void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFile.java new file mode 100644 index 0000000..11236c7 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFile.java @@ -0,0 +1,329 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Executor; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.core.asyncio.AsynchronousFile; +import org.apache.activemq6.core.asyncio.BufferCallback; +import org.apache.activemq6.core.asyncio.IOExceptionListener; +import org.apache.activemq6.core.asyncio.impl.AsynchronousFileImpl; +import org.apache.activemq6.core.journal.IOAsyncTask; +import org.apache.activemq6.core.journal.SequentialFile; +import org.apache.activemq6.core.journal.SequentialFileFactory; + +/** + * + * A AIOSequentialFile + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public class AIOSequentialFile extends AbstractSequentialFile implements IOExceptionListener +{ + private boolean opened = false; + + private final int maxIO; + + private AsynchronousFile aioFile; + + private final BufferCallback bufferCallback; + + /** The pool for Thread pollers */ + private final Executor pollerExecutor; + + public AIOSequentialFile(final SequentialFileFactory factory, + final int bufferSize, + final long bufferTimeoutMilliseconds, + final String directory, + final String fileName, + final int maxIO, + final BufferCallback bufferCallback, + final Executor writerExecutor, + final Executor pollerExecutor) + { + super(directory, new File(directory + "/" + fileName), factory, writerExecutor); + this.maxIO = maxIO; + this.bufferCallback = bufferCallback; + this.pollerExecutor = pollerExecutor; + } + + public boolean isOpen() + { + return opened; + } + + public int getAlignment() + { + checkOpened(); + + return aioFile.getBlockSize(); + } + + public int calculateBlockStart(final int position) + { + int alignment = getAlignment(); + + int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; + + return pos; + } + + public SequentialFile cloneFile() + { + return new AIOSequentialFile(factory, + -1, + -1, + getFile().getParent(), + getFileName(), + maxIO, + bufferCallback, + writerExecutor, + pollerExecutor); + } + + @Override + public synchronized void close() throws IOException, InterruptedException, HornetQException + { + if (!opened) + { + return; + } + + super.close(); + + opened = false; + + timedBuffer = null; + + aioFile.close(); + aioFile = null; + + notifyAll(); + } + + @Override + public synchronized void waitForClose() throws Exception + { + while (isOpen()) + { + wait(); + } + } + + public void fill(final int position, final int size, final byte fillCharacter) throws Exception + { + checkOpened(); + + int fileblockSize = aioFile.getBlockSize(); + + int blockSize = fileblockSize; + + if (size % (100 * 1024 * 1024) == 0) + { + blockSize = 100 * 1024 * 1024; + } + else if (size % (10 * 1024 * 1024) == 0) + { + blockSize = 10 * 1024 * 1024; + } + else if (size % (1024 * 1024) == 0) + { + blockSize = 1024 * 1024; + } + else if (size % (10 * 1024) == 0) + { + blockSize = 10 * 1024; + } + else + { + blockSize = fileblockSize; + } + + int blocks = size / blockSize; + + if (size % blockSize != 0) + { + blocks++; + } + + int filePosition = position; + + if (position % fileblockSize != 0) + { + filePosition = (position / fileblockSize + 1) * fileblockSize; + } + + aioFile.fill(filePosition, blocks, blockSize, fillCharacter); + + fileSize = aioFile.size(); + } + + public void open() throws Exception + { + open(maxIO, true); + } + + public synchronized void open(final int maxIO, final boolean useExecutor) throws HornetQException + { + opened = true; + + aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null, pollerExecutor, this); + + try + { + aioFile.open(getFile().getAbsolutePath(), maxIO); + } + catch (HornetQException e) + { + factory.onIOError(e, e.getMessage(), this); + throw e; + } + + position.set(0); + + aioFile.setBufferCallback(bufferCallback); + + fileSize = aioFile.size(); + } + + public void setBufferCallback(final BufferCallback callback) + { + aioFile.setBufferCallback(callback); + } + + public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws HornetQException + { + int bytesToRead = bytes.limit(); + + long positionToRead = position.getAndAdd(bytesToRead); + + bytes.rewind(); + + aioFile.read(positionToRead, bytesToRead, bytes, callback); + + return bytesToRead; + } + + public int read(final ByteBuffer bytes) throws Exception + { + SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback(); + + int bytesRead = read(bytes, waitCompletion); + + waitCompletion.waitCompletion(); + + return bytesRead; + } + + public void sync() + { + throw new UnsupportedOperationException("This method is not supported on AIO"); + } + + public long size() throws Exception + { + if (aioFile == null) + { + return getFile().length(); + } + else + { + return aioFile.size(); + } + } + + @Override + public String toString() + { + return "AIOSequentialFile:" + getFile().getAbsolutePath(); + } + + // Public methods + // ----------------------------------------------------------------------------------------------------- + + @Override + public void onIOException(Exception code, String message) + { + factory.onIOError(code, message, this); + } + + + public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + + writeDirect(bytes, true, completion); + + completion.waitCompletion(); + } + else + { + writeDirect(bytes, false, DummyCallback.getInstance()); + } + } + + /** + * + * @param sync Not used on AIO + * */ + public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) + { + final int bytesToWrite = factory.calculateBlockSize(bytes.limit()); + + final long positionToWrite = position.getAndAdd(bytesToWrite); + + aioFile.write(positionToWrite, bytesToWrite, bytes, callback); + } + + public void writeInternal(final ByteBuffer bytes) throws HornetQException + { + final int bytesToWrite = factory.calculateBlockSize(bytes.limit()); + + final long positionToWrite = position.getAndAdd(bytesToWrite); + + aioFile.writeInternal(positionToWrite, bytesToWrite, bytes); + } + + // Protected methods + // ----------------------------------------------------------------------------------------------------- + + @Override + protected ByteBuffer newBuffer(int size, int limit) + { + size = factory.calculateBlockSize(size); + limit = factory.calculateBlockSize(limit); + + ByteBuffer buffer = factory.newBuffer(size); + buffer.limit(limit); + return buffer; + } + + // Private methods + // ----------------------------------------------------------------------------------------------------- + + private void checkOpened() + { + if (aioFile == null || !opened) + { + throw new IllegalStateException("File not opened"); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFileFactory.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFileFactory.java new file mode 100644 index 0000000..eb46fd3 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFileFactory.java @@ -0,0 +1,358 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.core.asyncio.BufferCallback; +import org.apache.activemq6.core.asyncio.impl.AsynchronousFileImpl; +import org.apache.activemq6.core.journal.IOCriticalErrorListener; +import org.apache.activemq6.core.journal.SequentialFile; +import org.apache.activemq6.core.libaio.Native; +import org.apache.activemq6.journal.HornetQJournalLogger; +import org.apache.activemq6.utils.HornetQThreadFactory; + +/** + * A AIOSequentialFileFactory + * + * @author [email protected] + */ +public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory +{ + private static final boolean trace = HornetQJournalLogger.LOGGER.isTraceEnabled(); + + private final ReuseBuffersController buffersControl = new ReuseBuffersController(); + + private ExecutorService pollerExecutor; + + // This method exists just to make debug easier. + // I could replace log.trace by log.info temporarily while I was debugging + // Journal + private static void trace(final String message) + { + HornetQJournalLogger.LOGGER.trace(message); + } + + public AIOSequentialFileFactory(final String journalDir) + { + this(journalDir, + JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, + JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, + false, + null); + } + + public AIOSequentialFileFactory(final String journalDir, final IOCriticalErrorListener listener) + { + this(journalDir, + JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, + JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, + false, + listener); + } + + public AIOSequentialFileFactory(final String journalDir, + final int bufferSize, + final int bufferTimeout, + final boolean logRates) + { + this(journalDir, bufferSize, bufferTimeout, logRates, null); + } + + public AIOSequentialFileFactory(final String journalDir, + final int bufferSize, + final int bufferTimeout, + final boolean logRates, + final IOCriticalErrorListener listener) + { + super(journalDir, true, bufferSize, bufferTimeout, logRates, listener); + } + + public SequentialFile createSequentialFile(final String fileName, final int maxIO) + { + return new AIOSequentialFile(this, + bufferSize, + bufferTimeout, + journalDir, + fileName, + maxIO, + buffersControl.callback, + writeExecutor, + pollerExecutor); + } + + public boolean isSupportsCallbacks() + { + return true; + } + + public static boolean isSupported() + { + return AsynchronousFileImpl.isLoaded(); + } + + public ByteBuffer allocateDirectBuffer(final int size) + { + + int blocks = size / 512; + if (size % 512 != 0) + { + blocks++; + } + + // The buffer on AIO has to be a multiple of 512 + ByteBuffer buffer = AsynchronousFileImpl.newBuffer(blocks * 512); + + buffer.limit(size); + + return buffer; + } + + public void releaseDirectBuffer(final ByteBuffer buffer) + { + Native.destroyBuffer(buffer); + } + + public ByteBuffer newBuffer(int size) + { + if (size % 512 != 0) + { + size = (size / 512 + 1) * 512; + } + + return buffersControl.newBuffer(size); + } + + public void clearBuffer(final ByteBuffer directByteBuffer) + { + AsynchronousFileImpl.clearBuffer(directByteBuffer); + } + + public int getAlignment() + { + return 512; + } + + // For tests only + public ByteBuffer wrapBuffer(final byte[] bytes) + { + ByteBuffer newbuffer = newBuffer(bytes.length); + newbuffer.put(bytes); + return newbuffer; + } + + public int calculateBlockSize(final int position) + { + int alignment = getAlignment(); + + int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; + + return pos; + } + + /* (non-Javadoc) + * @see org.apache.activemq6.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer) + */ + @Override + public synchronized void releaseBuffer(final ByteBuffer buffer) + { + Native.destroyBuffer(buffer); + } + + @Override + public void start() + { + super.start(); + + pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this), + true, + AIOSequentialFileFactory.getThisClassLoader())); + + } + + @Override + public void stop() + { + buffersControl.stop(); + + if (pollerExecutor != null) + { + pollerExecutor.shutdown(); + + try + { + if (!pollerExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) + { + HornetQJournalLogger.LOGGER.timeoutOnPollerShutdown(new Exception("trace")); + } + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + + super.stop(); + } + + @Override + protected void finalize() + { + stop(); + } + + /** + * Class that will control buffer-reuse + */ + private class ReuseBuffersController + { + private volatile long bufferReuseLastTime = System.currentTimeMillis(); + + /** + * This queue is fed by {@link org.apache.activemq6.core.journal.impl.AIOSequentialFileFactory.ReuseBuffersController.LocalBufferCallback} + * which is called directly by NIO or NIO. On the case of the AIO this is almost called by the native layer as + * soon as the buffer is not being used any more and ready to be reused or GCed + */ + private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>(); + + private boolean stopped = false; + + final BufferCallback callback = new LocalBufferCallback(); + + public ByteBuffer newBuffer(final int size) + { + // if a new buffer wasn't requested in 10 seconds, we clear the queue + // This is being done this way as we don't need another Timeout Thread + // just to cleanup this + if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000) + { + if (AIOSequentialFileFactory.trace) + { + AIOSequentialFileFactory.trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + + " elements"); + } + + bufferReuseLastTime = System.currentTimeMillis(); + + clearPoll(); + } + + // if a buffer is bigger than the configured-bufferSize, we just create a new + // buffer. + if (size > bufferSize) + { + return AsynchronousFileImpl.newBuffer(size); + } + else + { + // We need to allocate buffers following the rules of the storage + // being used (AIO/NIO) + int alignedSize = calculateBlockSize(size); + + // Try getting a buffer from the queue... + ByteBuffer buffer = reuseBuffersQueue.poll(); + + if (buffer == null) + { + // if empty create a new one. + buffer = AsynchronousFileImpl.newBuffer(bufferSize); + + buffer.limit(alignedSize); + } + else + { + clearBuffer(buffer); + + // set the limit of the buffer to the bufferSize being required + buffer.limit(alignedSize); + } + + buffer.rewind(); + + return buffer; + } + } + + public synchronized void stop() + { + stopped = true; + clearPoll(); + } + + public synchronized void clearPoll() + { + ByteBuffer reusedBuffer; + + while ((reusedBuffer = reuseBuffersQueue.poll()) != null) + { + releaseBuffer(reusedBuffer); + } + } + + private class LocalBufferCallback implements BufferCallback + { + public void bufferDone(final ByteBuffer buffer) + { + synchronized (ReuseBuffersController.this) + { + + if (stopped) + { + releaseBuffer(buffer); + } + else + { + bufferReuseLastTime = System.currentTimeMillis(); + + // If a buffer has any other than the configured bufferSize, the buffer + // will be just sent to GC + if (buffer.capacity() == bufferSize) + { + reuseBuffersQueue.offer(buffer); + } + else + { + releaseBuffer(buffer); + } + } + } + } + } + } + + private static ClassLoader getThisClassLoader() + { + return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + return AIOSequentialFileFactory.class.getClassLoader(); + } + }); + + } + + @Override + public String toString() + { + return AIOSequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped + + "):" + super.toString(); + } +}
