http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/AsynchronousFileImpl.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/AsynchronousFileImpl.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/AsynchronousFileImpl.java
new file mode 100644
index 0000000..9ee4b0a
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/AsynchronousFileImpl.java
@@ -0,0 +1,819 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.asyncio.impl;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
+import java.util.PriorityQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQExceptionType;
+import org.apache.activemq6.core.asyncio.AIOCallback;
+import org.apache.activemq6.core.asyncio.AsynchronousFile;
+import org.apache.activemq6.core.asyncio.BufferCallback;
+import org.apache.activemq6.core.asyncio.IOExceptionListener;
+import org.apache.activemq6.core.libaio.Native;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+import org.apache.activemq6.utils.ReusableLatch;
+
+/**
+ * AsynchronousFile implementation
+ *
+ * @author [email protected]
+ *         Warning: Case you refactor the name or the package of this class
+ *         You need to make sure you also rename the C++ native calls
+ */
+public class AsynchronousFileImpl implements AsynchronousFile
+{
+   // Static 
----------------------------------------------------------------------------
+
+   private static final AtomicInteger totalMaxIO = new AtomicInteger(0);
+
+   private static boolean loaded = false;
+
+   /**
+    * This definition needs to match Version.h on the native sources.
+    * <p/>
+    * Or else the native module won't be loaded because of version mismatches
+    */
+   private static final int EXPECTED_NATIVE_VERSION = 52;
+
+   /**
+    * Used to determine the next writing sequence
+    */
+   private final AtomicLong nextWritingSequence = new AtomicLong(0);
+
+   /**
+    * Used to determine the next writing sequence.
+    * This is accessed from a single thread (the Poller Thread)
+    */
+   private long nextReadSequence = 0;
+
+   /**
+    * AIO can't guarantee ordering over callbacks.
+    * <p/>
+    * We use this {@link PriorityQueue} to hold values until they are in order
+    */
+   private final PriorityQueue<CallbackHolder> pendingCallbacks = new 
PriorityQueue<CallbackHolder>();
+
+   public static void addMax(final int io)
+   {
+      AsynchronousFileImpl.totalMaxIO.addAndGet(io);
+   }
+
+   /**
+    * For test purposes
+    */
+   public static int getTotalMaxIO()
+   {
+      return AsynchronousFileImpl.totalMaxIO.get();
+   }
+
+   public static void resetMaxAIO()
+   {
+      AsynchronousFileImpl.totalMaxIO.set(0);
+   }
+
+   public static int openFile(String fileName)
+   {
+      return Native.openFile(fileName);
+   }
+
+   public static void closeFile(int handle)
+   {
+      Native.closeFile(handle);
+   }
+
+   public static void destroyBuffer(ByteBuffer buffer)
+   {
+      Native.destroyBuffer(buffer);
+   }
+
+   private static boolean loadLibrary(final String name)
+   {
+      try
+      {
+         HornetQJournalLogger.LOGGER.trace(name + " being loaded");
+         System.loadLibrary(name);
+         if (Native.getNativeVersion() != 
AsynchronousFileImpl.EXPECTED_NATIVE_VERSION)
+         {
+            HornetQJournalLogger.LOGGER.incompatibleNativeLibrary();
+            return false;
+         }
+         else
+         {
+            return true;
+         }
+      }
+      catch (Throwable e)
+      {
+         HornetQJournalLogger.LOGGER.debug(name + " -> error loading the 
native library", e);
+         return false;
+      }
+
+   }
+
+   static
+   {
+      String[] libraries = new String[]{"HornetQAIO", "HornetQAIO64", 
"HornetQAIO32", "HornetQAIO_ia64"};
+
+      for (String library : libraries)
+      {
+         if (AsynchronousFileImpl.loadLibrary(library))
+         {
+            AsynchronousFileImpl.loaded = true;
+            break;
+         }
+         else
+         {
+            HornetQJournalLogger.LOGGER.debug("Library " + library + " not 
found!");
+         }
+      }
+
+      if (!AsynchronousFileImpl.loaded)
+      {
+         HornetQJournalLogger.LOGGER.debug("Couldn't locate LibAIO Wrapper");
+      }
+   }
+
+   public static boolean isLoaded()
+   {
+      return AsynchronousFileImpl.loaded;
+   }
+
+   // Attributes 
------------------------------------------------------------------------
+
+   private boolean opened = false;
+
+   private String fileName;
+
+   /**
+    * Used while inside the callbackDone and callbackError
+    */
+   private final Lock callbackLock = new ReentrantLock();
+
+   private final ReusableLatch pollerLatch = new ReusableLatch();
+
+   private volatile Runnable poller;
+
+   private int maxIO;
+
+   private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
+
+   private final ReusableLatch pendingWrites = new ReusableLatch();
+
+   private Semaphore maxIOSemaphore;
+
+   private BufferCallback bufferCallback;
+
+   /**
+    * A callback for IO errors when they happen
+    */
+   private final IOExceptionListener ioExceptionListener;
+
+   /**
+    * Warning: Beware of the C++ pointer! It will bite you! :-)
+    */
+   private ByteBuffer handler;
+
+   // A context switch on AIO would make it to synchronize the disk before
+   // switching to the new thread, what would cause
+   // serious performance problems. Because of that we make all the writes on
+   // AIO using a single thread.
+   private final Executor writeExecutor;
+
+   private final Executor pollerExecutor;
+
+   // AsynchronousFile implementation 
---------------------------------------------------
+
+   /**
+    * @param writeExecutor  It needs to be a single Thread executor. If null 
it will use the user thread to execute write operations
+    * @param pollerExecutor The thread pool that will initialize poller 
handlers
+    */
+   public AsynchronousFileImpl(final Executor writeExecutor, final Executor 
pollerExecutor, final IOExceptionListener ioExceptionListener)
+   {
+      this.writeExecutor = writeExecutor;
+      this.pollerExecutor = pollerExecutor;
+      this.ioExceptionListener = ioExceptionListener;
+   }
+
+   public AsynchronousFileImpl(final Executor writeExecutor, final Executor 
pollerExecutor)
+   {
+      this(writeExecutor, pollerExecutor, null);
+   }
+
+   public void open(final String fileName1, final int maxIOArgument) throws 
HornetQException
+   {
+      writeLock.lock();
+
+      try
+      {
+         if (opened)
+         {
+            throw new IllegalStateException("AsynchronousFile is already 
opened");
+         }
+
+         this.maxIO = maxIOArgument;
+         maxIOSemaphore = new Semaphore(this.maxIO);
+
+         this.fileName = fileName1;
+
+         try
+         {
+            handler = Native.init(AsynchronousFileImpl.class, fileName1, 
this.maxIO, HornetQJournalLogger.LOGGER);
+         }
+         catch (HornetQException e)
+         {
+            HornetQException ex = null;
+            if (e.getType() == 
HornetQExceptionType.NATIVE_ERROR_CANT_INITIALIZE_AIO)
+            {
+               ex = new HornetQException(e.getType(),
+                                         "Can't initialize AIO. Currently AIO 
in use = " + AsynchronousFileImpl.totalMaxIO.get() +
+                                            ", trying to allocate more " +
+                                            maxIOArgument,
+                                         e);
+            }
+            else
+            {
+               ex = e;
+            }
+            throw ex;
+         }
+         opened = true;
+         AsynchronousFileImpl.addMax(this.maxIO);
+         nextWritingSequence.set(0);
+         nextReadSequence = 0;
+      }
+      finally
+      {
+         writeLock.unlock();
+      }
+   }
+
+   public void close() throws InterruptedException, HornetQException
+   {
+      checkOpened();
+
+      writeLock.lock();
+
+      try
+      {
+
+         while (!pendingWrites.await(60000))
+         {
+            HornetQJournalLogger.LOGGER.couldNotGetLock(fileName);
+         }
+
+         while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+         {
+            HornetQJournalLogger.LOGGER.couldNotGetLock(fileName);
+         }
+
+         maxIOSemaphore = null;
+         if (poller != null)
+         {
+            stopPoller();
+         }
+
+         if (handler != null)
+         {
+            Native.closeInternal(handler);
+            AsynchronousFileImpl.addMax(-maxIO);
+         }
+         opened = false;
+         handler = null;
+      }
+      finally
+      {
+         writeLock.unlock();
+      }
+   }
+
+
+   public void writeInternal(long positionToWrite, long size, ByteBuffer 
bytes) throws HornetQException
+   {
+      try
+      {
+         Native.writeInternal(handler, positionToWrite, size, bytes);
+      }
+      catch (HornetQException e)
+      {
+         fireExceptionListener(e.getType().getCode(), e.getMessage());
+         throw e;
+      }
+      if (bufferCallback != null)
+      {
+         bufferCallback.bufferDone(bytes);
+      }
+   }
+
+
+   public void write(final long position,
+                     final long size,
+                     final ByteBuffer directByteBuffer,
+                     final AIOCallback aioCallback)
+   {
+      if (aioCallback == null)
+      {
+         throw new NullPointerException("Null Callback");
+      }
+
+      checkOpened();
+      if (poller == null)
+      {
+         startPoller();
+      }
+
+      pendingWrites.countUp();
+
+      if (writeExecutor != null)
+      {
+         maxIOSemaphore.acquireUninterruptibly();
+
+         writeExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               long sequence = nextWritingSequence.getAndIncrement();
+
+               try
+               {
+                  Native.write(AsynchronousFileImpl.this, handler, sequence, 
position, size, directByteBuffer, aioCallback);
+               }
+               catch (HornetQException e)
+               {
+                  callbackError(aioCallback, sequence, directByteBuffer, 
e.getType().getCode(), e.getMessage());
+               }
+               catch (RuntimeException e)
+               {
+                  callbackError(aioCallback,
+                                sequence,
+                                directByteBuffer,
+                                HornetQExceptionType.INTERNAL_ERROR.getCode(),
+                                e.getMessage());
+               }
+            }
+         });
+      }
+      else
+      {
+         maxIOSemaphore.acquireUninterruptibly();
+
+         long sequence = nextWritingSequence.getAndIncrement();
+
+         try
+         {
+            Native.write(this, handler, sequence, position, size, 
directByteBuffer, aioCallback);
+         }
+         catch (HornetQException e)
+         {
+            callbackError(aioCallback, sequence, directByteBuffer, 
e.getType().getCode(), e.getMessage());
+         }
+         catch (RuntimeException e)
+         {
+            callbackError(aioCallback, sequence, directByteBuffer, 
HornetQExceptionType.INTERNAL_ERROR.getCode(), e.getMessage());
+         }
+      }
+
+   }
+
+   public void read(final long position,
+                    final long size,
+                    final ByteBuffer directByteBuffer,
+                    final AIOCallback aioPackage) throws HornetQException
+   {
+      checkOpened();
+      if (poller == null)
+      {
+         startPoller();
+      }
+      pendingWrites.countUp();
+      maxIOSemaphore.acquireUninterruptibly();
+      try
+      {
+         Native.read(this, handler, position, size, directByteBuffer, 
aioPackage);
+      }
+      catch (HornetQException e)
+      {
+         // Release only if an exception happened
+         maxIOSemaphore.release();
+         pendingWrites.countDown();
+         throw e;
+      }
+      catch (RuntimeException e)
+      {
+         // Release only if an exception happened
+         maxIOSemaphore.release();
+         pendingWrites.countDown();
+         throw e;
+      }
+   }
+
+   public long size() throws HornetQException
+   {
+      checkOpened();
+      return Native.size0(handler);
+   }
+
+   public void fill(final long position, final int blocks, final long size, 
final byte fillChar) throws HornetQException
+   {
+      checkOpened();
+      try
+      {
+         Native.fill(handler, position, blocks, size, fillChar);
+      }
+      catch (HornetQException e)
+      {
+         fireExceptionListener(e.getType().getCode(), e.getMessage());
+         throw e;
+      }
+   }
+
+   public int getBlockSize()
+   {
+      return 512;
+   }
+
+   /**
+    * This needs to be synchronized because of
+    * http://bugs.sun.com/view_bug.do?bug_id=6791815
+    * 
http://mail.openjdk.java.net/pipermail/hotspot-runtime-dev/2009-January/000386.html
+    */
+   public static synchronized ByteBuffer newBuffer(final int size)
+   {
+      if (size % 512 != 0)
+      {
+         throw new RuntimeException("Buffer size needs to be aligned to 512");
+      }
+
+      return Native.newNativeBuffer(size);
+   }
+
+   public void setBufferCallback(final BufferCallback callback)
+   {
+      bufferCallback = callback;
+   }
+
+   /**
+    * Return the JNI handler used on C++
+    */
+   public ByteBuffer getHandler()
+   {
+      return handler;
+   }
+
+   public static void clearBuffer(final ByteBuffer buffer)
+   {
+      Native.resetBuffer(buffer, buffer.limit());
+      buffer.position(0);
+   }
+
+   // Protected 
-------------------------------------------------------------------------
+
+   @Override
+   protected void finalize()
+   {
+      if (opened)
+      {
+         HornetQJournalLogger.LOGGER.fileFinalizedWhileOpen(fileName);
+      }
+   }
+
+   // Private 
---------------------------------------------------------------------------
+
+   private void callbackDone(final AIOCallback callback, final long sequence, 
final ByteBuffer buffer)
+   {
+      maxIOSemaphore.release();
+
+      pendingWrites.countDown();
+
+      callbackLock.lock();
+
+      try
+      {
+
+         if (sequence == -1)
+         {
+            callback.done();
+         }
+         else
+         {
+            if (sequence == nextReadSequence)
+            {
+               nextReadSequence++;
+               callback.done();
+               flushCallbacks();
+            }
+            else
+            {
+               pendingCallbacks.add(new CallbackHolder(sequence, callback));
+            }
+         }
+
+         // The buffer is not sent on callback for read operations
+         if (bufferCallback != null && buffer != null)
+         {
+            bufferCallback.bufferDone(buffer);
+         }
+      }
+      finally
+      {
+         callbackLock.unlock();
+      }
+   }
+
+   private void flushCallbacks()
+   {
+      while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence 
== nextReadSequence)
+      {
+         CallbackHolder holder = pendingCallbacks.poll();
+         if (holder.isError())
+         {
+            ErrorCallback error = (ErrorCallback) holder;
+            holder.callback.onError(error.errorCode, error.message);
+         }
+         else
+         {
+            holder.callback.done();
+         }
+         nextReadSequence++;
+      }
+   }
+
+   // Called by the JNI layer.. just ignore the
+   // warning
+   private void callbackError(final AIOCallback callback,
+                              final long sequence,
+                              final ByteBuffer buffer,
+                              final int errorCode,
+                              final String errorMessage)
+   {
+      HornetQJournalLogger.LOGGER.callbackError(errorMessage);
+
+      fireExceptionListener(errorCode, errorMessage);
+
+      maxIOSemaphore.release();
+
+      pendingWrites.countDown();
+
+      callbackLock.lock();
+
+      try
+      {
+         if (sequence == -1)
+         {
+            callback.onError(errorCode, errorMessage);
+         }
+         else
+         {
+            if (sequence == nextReadSequence)
+            {
+               nextReadSequence++;
+               callback.onError(errorCode, errorMessage);
+               flushCallbacks();
+            }
+            else
+            {
+               pendingCallbacks.add(new ErrorCallback(sequence, callback, 
errorCode, errorMessage));
+            }
+         }
+      }
+      finally
+      {
+         callbackLock.unlock();
+      }
+
+      // The buffer is not sent on callback for read operations
+      if (bufferCallback != null && buffer != null)
+      {
+         bufferCallback.bufferDone(buffer);
+      }
+   }
+
+   /**
+    * This is called by the native layer
+    *
+    * @param errorCode
+    * @param errorMessage
+    */
+   private void fireExceptionListener(final int errorCode, final String 
errorMessage)
+   {
+      HornetQJournalLogger.LOGGER.ioError(errorCode, errorMessage);
+      if (ioExceptionListener != null)
+      {
+         
ioExceptionListener.onIOException(HornetQExceptionType.getType(errorCode).createException(errorMessage),
 errorMessage);
+      }
+   }
+
+   private void pollEvents()
+   {
+      if (!opened)
+      {
+         return;
+      }
+      Native.internalPollEvents(handler);
+   }
+
+   private void startPoller()
+   {
+      writeLock.lock();
+
+      try
+      {
+
+         if (poller == null)
+         {
+            pollerLatch.countUp();
+            poller = new PollerRunnable();
+            try
+            {
+               pollerExecutor.execute(poller);
+            }
+            catch (Exception ex)
+            {
+               HornetQJournalLogger.LOGGER.errorStartingPoller(ex);
+            }
+         }
+      }
+      finally
+      {
+         writeLock.unlock();
+      }
+   }
+
+   private void checkOpened()
+   {
+      if (!opened)
+      {
+         throw new RuntimeException("File is not opened");
+      }
+   }
+
+   /**
+    * @throws HornetQException
+    * @throws InterruptedException
+    */
+   private void stopPoller() throws HornetQException, InterruptedException
+   {
+      Native.stopPoller(handler);
+      // We need to make sure we won't call close until Poller is
+      // completely done, or we might get beautiful GPFs
+      pollerLatch.await();
+   }
+
+   public static FileLock lock(int handle)
+   {
+      if (Native.flock(handle))
+      {
+         return new HornetQFileLock(handle);
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   // Native 
----------------------------------------------------------------------------
+
+
+   /**
+    * Explicitly adding a compare to clause that returns 0 for at least the 
same object.
+    * <p/>
+    * If {@link Comparable#compareTo(Object)} does not return 0 -for at least 
the same object- some
+    * Collection classes methods will fail (example {@link 
PriorityQueue#remove(Object)}. If it
+    * returns 0, then {@link #equals(Object)} must return {@code true} for the 
exact same cases,
+    * otherwise we will get compatibility problems between Java5 and Java6.
+    */
+   private static class CallbackHolder implements Comparable<CallbackHolder>
+   {
+      final long sequence;
+
+      final AIOCallback callback;
+
+      public boolean isError()
+      {
+         return false;
+      }
+
+      public CallbackHolder(final long sequence, final AIOCallback callback)
+      {
+         this.sequence = sequence;
+         this.callback = callback;
+      }
+
+      public int compareTo(final CallbackHolder o)
+      {
+         // It shouldn't be equals in any case
+         if (this == o)
+            return 0;
+         if (sequence <= o.sequence)
+         {
+            return -1;
+         }
+         else
+         {
+            return 1;
+         }
+      }
+
+      /**
+       * See {@link CallbackHolder}.
+       */
+      @Override
+      public int hashCode()
+      {
+         return super.hashCode();
+      }
+
+      /**
+       * See {@link CallbackHolder}.
+       */
+      @Override
+      public boolean equals(Object obj)
+      {
+         return super.equals(obj);
+      }
+   }
+
+   private static final class ErrorCallback extends CallbackHolder
+   {
+      final int errorCode;
+
+      final String message;
+
+      @Override
+      public boolean isError()
+      {
+         return true;
+      }
+
+      public ErrorCallback(final long sequence, final AIOCallback callback, 
final int errorCode, final String message)
+      {
+         super(sequence, callback);
+
+         this.errorCode = errorCode;
+
+         this.message = message;
+      }
+
+      /**
+       * See {@link CallbackHolder}.
+       */
+      @Override
+      public int hashCode()
+      {
+         return super.hashCode();
+      }
+
+      /**
+       * See {@link CallbackHolder}.
+       */
+      @Override
+      public boolean equals(Object obj)
+      {
+         return super.equals(obj);
+      }
+   }
+
+   private class PollerRunnable implements Runnable
+   {
+      PollerRunnable()
+      {
+      }
+
+      public void run()
+      {
+         try
+         {
+            pollEvents();
+         }
+         finally
+         {
+            // This gives us extra protection in cases of interruption
+            // Case the poller thread is interrupted, this will allow us to
+            // restart the thread when required
+            poller = null;
+            pollerLatch.countDown();
+         }
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/HornetQFileLock.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/HornetQFileLock.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/HornetQFileLock.java
new file mode 100644
index 0000000..ba2514a
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/impl/HornetQFileLock.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.asyncio.impl;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+import org.apache.activemq6.core.libaio.Native;
+
+/**
+ * A HornetQFileLock
+ * @author clebertsuconic
+ */
+public class HornetQFileLock extends FileLock
+{
+
+   private final int handle;
+
+   protected HornetQFileLock(final int handle)
+   {
+      super((FileChannel)null, 0, 0, false);
+      this.handle = handle;
+   }
+
+   @Override
+   public boolean isValid()
+   {
+      return true;
+   }
+
+   @Override
+   public void release() throws IOException
+   {
+      Native.closeFile(handle);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/EncodingSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/EncodingSupport.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/EncodingSupport.java
new file mode 100644
index 0000000..f531434
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/EncodingSupport.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+
+/**
+ *
+ * This interface provides encoding support for the Journal.
+ *
+ *
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ *
+ */
+public interface EncodingSupport
+{
+   int getEncodeSize();
+
+   void encode(HornetQBuffer buffer);
+
+   void decode(HornetQBuffer buffer);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOAsyncTask.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOAsyncTask.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOAsyncTask.java
new file mode 100644
index 0000000..e130481
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOAsyncTask.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+import org.apache.activemq6.core.asyncio.AIOCallback;
+
+/**
+ *
+ * This class is just a direct extension of AIOCallback.
+ * Just to avoid the direct dependency of 
org.apache.activemq6.core.asynciio.AIOCallback from the journal.
+ *
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ *
+ */
+public interface IOAsyncTask extends AIOCallback
+{
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCompletion.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCompletion.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCompletion.java
new file mode 100644
index 0000000..f1d5915
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCompletion.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+/**
+ * A IOCompletion
+ *
+ * @author <mailto:[email protected]";>Clebert Suconic</a>
+ *
+ *
+ */
+public interface IOCompletion extends IOAsyncTask
+{
+   void storeLineUp();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCriticalErrorListener.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCriticalErrorListener.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCriticalErrorListener.java
new file mode 100644
index 0000000..fba45e8
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/IOCriticalErrorListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+/**
+ * A IOCriticalErrorListener
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface IOCriticalErrorListener
+{
+   void onIOException(Exception code, String message, SequentialFile file);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/Journal.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/Journal.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/Journal.java
new file mode 100644
index 0000000..85c65e9
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/Journal.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq6.core.journal.impl.JournalFile;
+import org.apache.activemq6.core.server.HornetQComponent;
+
+/**
+ * Most methods on the journal provide a blocking version where you select the 
sync mode and a non
+ * blocking mode where you pass a completion callback as a parameter.
+ * <p>
+ * Notice also that even on the callback methods it's possible to pass the 
sync mode. That will only
+ * make sense on the NIO operations.
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ * @see org.apache.activemq6.utils.IDGenerator
+ */
+public interface Journal extends HornetQComponent
+{
+   enum JournalState
+   {
+      STOPPED,
+      /**
+       * The journal has some fields initialized and services running. But it 
is not fully
+       * operational. See {@link JournalState#LOADED}.
+       */
+      STARTED,
+      /**
+       * When a replicating server is still not synchronized with its live. So 
if the live stops,
+       * the backup may not fail-over and will stop as well.
+       */
+      SYNCING,
+      /**
+       * Journal is being used by a replicating server which is up-to-date 
with its live. That means
+       * that if the live stops, the backup can fail-over.
+       */
+      SYNCING_UP_TO_DATE,
+      /**
+       * The journal is fully operational. This is the state the journal 
should be when its server
+       * is live.
+       */
+      LOADED;
+   }
+
+   // Non transactional operations
+
+   void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) 
throws Exception;
+
+   void appendAddRecord(long id, byte recordType, EncodingSupport record, 
boolean sync) throws Exception;
+
+   void appendAddRecord(long id, byte recordType, EncodingSupport record, 
boolean sync, IOCompletion completionCallback) throws Exception;
+
+   void appendUpdateRecord(long id, byte recordType, byte[] record, boolean 
sync) throws Exception;
+
+   void appendUpdateRecord(long id, byte recordType, EncodingSupport record, 
boolean sync) throws Exception;
+
+   void appendUpdateRecord(long id,
+                           byte recordType,
+                           EncodingSupport record,
+                           boolean sync,
+                           IOCompletion completionCallback) throws Exception;
+
+   void appendDeleteRecord(long id, boolean sync) throws Exception;
+
+   void appendDeleteRecord(long id, boolean sync, IOCompletion 
completionCallback) throws Exception;
+
+   // Transactional operations
+
+   void appendAddRecordTransactional(long txID, long id, byte recordType, 
byte[] record) throws Exception;
+
+   void appendAddRecordTransactional(long txID, long id, byte recordType, 
EncodingSupport record) throws Exception;
+
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, 
byte[] record) throws Exception;
+
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, 
EncodingSupport record) throws Exception;
+
+   void appendDeleteRecordTransactional(long txID, long id, byte[] record) 
throws Exception;
+
+   void appendDeleteRecordTransactional(long txID, long id, EncodingSupport 
record) throws Exception;
+
+   void appendDeleteRecordTransactional(long txID, long id) throws Exception;
+
+   void appendCommitRecord(long txID, boolean sync) throws Exception;
+
+   void appendCommitRecord(long txID, boolean sync, IOCompletion callback) 
throws Exception;
+
+   /**
+    * @param txID
+    * @param sync
+    * @param callback
+    * @param lineUpContext if appendCommitRecord should call a storeLineUp. 
This is because the
+    *           caller may have already taken into account
+    * @throws Exception
+    */
+   void appendCommitRecord(long txID, boolean sync, IOCompletion callback, 
boolean lineUpContext) throws Exception;
+
+   /**
+    *
+    * <p>If the system crashed after a prepare was called, it should store 
information that is required to bring the transaction
+    *     back to a state it could be committed. </p>
+    *
+    * <p> transactionData allows you to store any other supporting user-data 
related to the transaction</p>
+    *
+    * @param txID
+    * @param transactionData - extra user data for the prepare
+    * @throws Exception
+    */
+   void appendPrepareRecord(long txID, EncodingSupport transactionData, 
boolean sync) throws Exception;
+
+   void appendPrepareRecord(long txID, EncodingSupport transactionData, 
boolean sync, IOCompletion callback) throws Exception;
+
+   void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) 
throws Exception;
+
+   void appendRollbackRecord(long txID, boolean sync) throws Exception;
+
+   void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) 
throws Exception;
+
+   // Load
+
+   JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;
+
+   /**
+    * Load internal data structures and not expose any data. This is only 
useful if you're using the
+    * journal but not interested on the current data. Useful in situations 
where the journal is
+    * being replicated, copied... etc.
+    */
+   JournalLoadInformation loadInternalOnly() throws Exception;
+
+   /**
+    * Load internal data structures, and remain waiting for synchronization to 
complete.
+    * @param state the current state of the journal, this parameter ensures 
consistency.
+    */
+   JournalLoadInformation loadSyncOnly(JournalState state) throws Exception;
+
+   void lineUpContext(IOCompletion callback);
+
+   JournalLoadInformation load(List<RecordInfo> committedRecords,
+                               List<PreparedTransactionInfo> 
preparedTransactions,
+                               TransactionFailureCallback transactionFailure) 
throws Exception;
+
+   int getAlignment() throws Exception;
+
+   int getNumberOfRecords();
+
+   int getUserVersion();
+
+   void perfBlast(int pages);
+
+   void runDirectJournalBlast() throws Exception;
+
+   /**
+    * Reserves journal file IDs, creates the necessary files for 
synchronization, and places
+    * references to these (reserved for sync) files in the map.
+    * <p>
+    * During the synchronization between a live server and backup, we reserve 
in the backup the
+    * journal file IDs used in the live server. This call also makes sure the 
files are created
+    * empty without any kind of headers added.
+    * @param fileIds IDs to reserve for synchronization
+    * @return map to be filled with id and journal file pairs for 
<b>synchronization</b>.
+    * @throws Exception
+    */
+   Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws 
Exception;
+
+   /**
+    * Write lock the Journal and write lock the compacting process. Necessary 
only during
+    * replication for backup synchronization.
+    */
+   void synchronizationLock();
+
+   /**
+    * Unlock the Journal and the compacting process.
+    * @see Journal#synchronizationLock()
+    */
+   void synchronizationUnlock();
+
+   /**
+    * Force the usage of a new {@link JournalFile}.
+    * @throws Exception
+    */
+   void forceMoveNextFile() throws Exception;
+
+   /**
+    * Returns the {@link JournalFile}s in use.
+    * @return array with all {@link JournalFile}s in use
+    */
+   JournalFile[] getDataFiles();
+
+   SequentialFileFactory getFileFactory();
+
+   int getFileSize();
+
+   /**
+    * This method will start compact using the compactorExecutor and block up 
to timeout seconds
+    * @param timeout the timeout in seconds or block forever if <= 0
+    * @throws Exception
+    */
+   void scheduleCompactAndBlock(int timeout) throws Exception;
+
+   /**
+    * Stops any operation that may delete or modify old (stale) data.
+    * <p>
+    * Meant to be used during synchronization of data between a live server 
and its replicating
+    * (remote) backup. Old files must not be compacted or deleted during 
synchronization.
+    */
+   void replicationSyncPreserveOldFiles();
+
+   /**
+    * Restarts file reclaim and compacting on the journal.
+    * <p>
+    * Meant to be used to revert the effect of {@link 
#replicationSyncPreserveOldFiles()}. it should
+    * only be called once the synchronization of the backup and live servers 
is completed.
+    */
+   void replicationSyncFinished();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/JournalLoadInformation.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/JournalLoadInformation.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/JournalLoadInformation.java
new file mode 100644
index 0000000..ed1b18d
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/JournalLoadInformation.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+/**
+ * This is a POJO containing information about the journal during load time.
+ * @author <mailto:[email protected]";>Clebert Suconic</a>
+ */
+public class JournalLoadInformation
+{
+
+   private int numberOfRecords = 0;
+
+   private long maxID = -1;
+
+   public JournalLoadInformation()
+   {
+      super();
+   }
+
+   /**
+    * @param numberOfRecords
+    * @param maxID
+    */
+   public JournalLoadInformation(final int numberOfRecords, final long maxID)
+   {
+      super();
+      this.numberOfRecords = numberOfRecords;
+      this.maxID = maxID;
+   }
+
+   /**
+    * @return the numberOfRecords
+    */
+   public int getNumberOfRecords()
+   {
+      return numberOfRecords;
+   }
+
+   /**
+    * @param numberOfRecords the numberOfRecords to set
+    */
+   public void setNumberOfRecords(final int numberOfRecords)
+   {
+      this.numberOfRecords = numberOfRecords;
+   }
+
+   /**
+    * @return the maxID
+    */
+   public long getMaxID()
+   {
+      return maxID;
+   }
+
+   /**
+    * @param maxID the maxID to set
+    */
+   public void setMaxID(final long maxID)
+   {
+      this.maxID = maxID;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + (int)(maxID ^ maxID >>> 32);
+      result = prime * result + numberOfRecords;
+      return result;
+   }
+
+   @Override
+   public boolean equals(final Object obj)
+   {
+      if (this == obj)
+      {
+         return true;
+      }
+      if (obj == null)
+      {
+         return false;
+      }
+      if (getClass() != obj.getClass())
+      {
+         return false;
+      }
+      JournalLoadInformation other = (JournalLoadInformation)obj;
+      if (maxID != other.maxID)
+      {
+         return false;
+      }
+      if (numberOfRecords != other.numberOfRecords)
+      {
+         return false;
+      }
+      return true;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "JournalLoadInformation [maxID=" + maxID + ", numberOfRecords=" + 
numberOfRecords + "]";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/LoaderCallback.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/LoaderCallback.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/LoaderCallback.java
new file mode 100644
index 0000000..3d1c156
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/LoaderCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+/**
+ *
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ *
+ */
+public interface LoaderCallback extends TransactionFailureCallback
+{
+   void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
+
+   void addRecord(RecordInfo info);
+
+   void deleteRecord(long id);
+
+   void updateRecord(RecordInfo info);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/PreparedTransactionInfo.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/PreparedTransactionInfo.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/PreparedTransactionInfo.java
new file mode 100644
index 0000000..9ae3eb0
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/PreparedTransactionInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ * A PreparedTransactionInfo
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ *
+ */
+public class PreparedTransactionInfo
+{
+   public final long id;
+
+   public final byte[] extraData;
+
+   public final List<RecordInfo> records = new ArrayList<RecordInfo>();
+
+   public final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
+
+   public PreparedTransactionInfo(final long id, final byte[] extraData)
+   {
+      this.id = id;
+
+      this.extraData = extraData;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/RecordInfo.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/RecordInfo.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/RecordInfo.java
new file mode 100644
index 0000000..fb78b24
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/RecordInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+/**
+ * A RecordInfo
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ */
+public class RecordInfo
+{
+   public RecordInfo(final long id, final byte userRecordType, final byte[] 
data, final boolean isUpdate, final short compactCount)
+   {
+      this.id = id;
+
+      this.userRecordType = userRecordType;
+
+      this.data = data;
+
+      this.isUpdate = isUpdate;
+
+      this.compactCount = compactCount;
+   }
+
+   /**
+    * How many times this record was compacted (up to 7 times)
+    * After the record has reached 7 times, it will always be 7
+    * As we only store up to 0x7 binary, as part of the recordID (binary 111)
+    */
+   public final short compactCount;
+
+   public final long id;
+
+   public final byte userRecordType;
+
+   public final byte[] data;
+
+   public boolean isUpdate;
+
+   public byte getUserRecordType()
+   {
+      return userRecordType;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      return (int) (id >>> 32 ^ id);
+   }
+
+   @Override
+   public boolean equals(final Object other)
+   {
+      if (!(other instanceof RecordInfo))
+      {
+         return false;
+      }
+      RecordInfo r = (RecordInfo) other;
+
+      return r.id == id;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "RecordInfo (id=" + id +
+         ", userRecordType = " +
+         userRecordType +
+         ", data.length = " +
+         data.length +
+         ", isUpdate = " +
+         isUpdate;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFile.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFile.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFile.java
new file mode 100644
index 0000000..94cdfb3
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFile.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.core.journal.impl.TimedBuffer;
+
+/**
+ * A SequentialFile
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ */
+public interface SequentialFile
+{
+   /*
+    * Creates the file if it doesn't already exist, then opens it
+    */
+   void open() throws Exception;
+
+   boolean isOpen();
+
+   boolean exists();
+
+   /**
+    * The maximum number of simultaneous writes accepted
+    * @param maxIO
+    * @throws Exception
+    */
+   void open(int maxIO, boolean useExecutor) throws Exception;
+
+   boolean fits(int size);
+
+   int getAlignment() throws Exception;
+
+   int calculateBlockStart(int position) throws Exception;
+
+   String getFileName();
+
+   void fill(int position, int size, byte fillCharacter) throws Exception;
+
+   void delete() throws IOException, InterruptedException, HornetQException;
+
+   void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws 
Exception;
+
+   void write(HornetQBuffer bytes, boolean sync) throws Exception;
+
+   void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) 
throws Exception;
+
+   void write(EncodingSupport bytes, boolean sync) throws Exception;
+
+   /**
+    * Write directly to the file without using any buffer
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile 
implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
+   void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
+
+   /**
+    * Write directly to the file without using any buffer
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile 
implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
+   void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
+
+   /**
+    * Write directly to the file. This is used by compacting and other places 
where we write a big
+    * buffer in a single shot. writeInternal should always block until the 
entire write is sync on
+    * disk.
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile 
implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
+   void writeInternal(ByteBuffer bytes) throws Exception;
+
+   /**
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile 
implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
+   int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
+
+   /**
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile 
implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
+   int read(ByteBuffer bytes) throws Exception;
+
+   void position(long pos) throws IOException;
+
+   long position();
+
+   void close() throws Exception;
+
+   void waitForClose() throws Exception;
+
+   void sync() throws IOException;
+
+   long size() throws Exception;
+
+   void renameTo(String newFileName) throws Exception;
+
+   SequentialFile cloneFile();
+
+   void copyTo(SequentialFile newFileName) throws Exception;
+
+   void setTimedBuffer(TimedBuffer buffer);
+
+   /**
+    * Returns a native File of the file underlying this sequential file.
+    */
+   File getJavaFile();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFileFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFileFactory.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFileFactory.java
new file mode 100644
index 0000000..0492641
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/SequentialFileFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ *
+ * A SequentialFileFactory
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ *
+ */
+public interface SequentialFileFactory
+{
+   SequentialFile createSequentialFile(String fileName, int maxIO);
+
+   /**
+    * Lists files that end with the given extension.
+    * <p>
+    * This method inserts a ".' before the extension.
+    * @param extension
+    * @return
+    * @throws Exception
+    */
+   List<String> listFiles(String extension) throws Exception;
+
+   boolean isSupportsCallbacks();
+
+   /** The SequentialFile will call this method when a disk IO Error happens 
during the live phase. */
+   void onIOError(Exception exception, String message, SequentialFile file);
+
+   /** used for cases where you need direct buffer outside of the journal 
context.
+    *  This is because the native layer has a method that can be reused in 
certain cases like paging */
+   ByteBuffer allocateDirectBuffer(int size);
+
+   /** used for cases where you need direct buffer outside of the journal 
context.
+    *  This is because the native layer has a method that can be reused in 
certain cases like paging */
+   void releaseDirectBuffer(ByteBuffer buffer);
+
+   /**
+    * Note: You need to release the buffer if is used for reading operations. 
You don't need to do
+    * it if using writing operations (AIO Buffer Lister will take of writing 
operations)
+    * @param size
+    * @return the allocated ByteBuffer
+    */
+   ByteBuffer newBuffer(int size);
+
+   void releaseBuffer(ByteBuffer buffer);
+
+   void activateBuffer(SequentialFile file);
+
+   void deactivateBuffer();
+
+   // To be used in tests only
+   ByteBuffer wrapBuffer(byte[] bytes);
+
+   int getAlignment();
+
+   int calculateBlockSize(int bytes);
+
+   String getDirectory();
+
+   void clearBuffer(ByteBuffer buffer);
+
+   void start();
+
+   void stop();
+
+   /**
+    * Creates the directory if it does not exist yet.
+    */
+   void createDirs() throws Exception;
+
+   void flush();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TestableJournal.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TestableJournal.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TestableJournal.java
new file mode 100644
index 0000000..8f578bb
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TestableJournal.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+import org.apache.activemq6.core.journal.impl.JournalFile;
+
+/**
+ *
+ * A TestableJournal
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ *
+ */
+public interface TestableJournal extends Journal
+{
+   int getDataFilesCount();
+
+   int getFreeFilesCount();
+
+   int getOpenedFilesCount();
+
+   int getIDMapSize();
+
+   String debug() throws Exception;
+
+   void debugWait() throws Exception;
+
+   int getFileSize();
+
+   int getMinFiles();
+
+   String getFilePrefix();
+
+   String getFileExtension();
+
+   int getMaxAIO();
+
+   void forceMoveNextFile() throws Exception;
+
+   void setAutoReclaim(boolean autoReclaim);
+
+   boolean isAutoReclaim();
+
+   void testCompact();
+
+   JournalFile getCurrentFile();
+
+   /**
+    * This method is called automatically when a new file is opened.
+    * <p>
+    * It will among other things, remove stale files and make them available 
for reuse.
+    * <p>
+    * This method locks the journal.
+    * @return true if it needs to re-check due to cleanup or other factors
+    */
+   boolean checkReclaimStatus() throws Exception;
+
+   JournalFile[] getDataFiles();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TransactionFailureCallback.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TransactionFailureCallback.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TransactionFailureCallback.java
new file mode 100644
index 0000000..09d66d6
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/TransactionFailureCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal;
+
+import java.util.List;
+
+/**
+ * A Callback to receive information about bad transactions for extra cleanup 
required for broken transactions such as large messages.
+ *
+ * @author <mailto:[email protected]";>Clebert Suconic</a>
+ *
+ *
+ */
+public interface TransactionFailureCallback
+{
+
+   /** To be used to inform about transactions without commit records.
+    *  This could be used to remove extra resources associated with the 
transactions (such as external files received during the transaction) */
+   void failedTransaction(long transactionID, List<RecordInfo> records, 
List<RecordInfo> recordsToDelete);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFile.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFile.java
new file mode 100644
index 0000000..11236c7
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFile.java
@@ -0,0 +1,329 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.core.asyncio.AsynchronousFile;
+import org.apache.activemq6.core.asyncio.BufferCallback;
+import org.apache.activemq6.core.asyncio.IOExceptionListener;
+import org.apache.activemq6.core.asyncio.impl.AsynchronousFileImpl;
+import org.apache.activemq6.core.journal.IOAsyncTask;
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+
+/**
+ *
+ * A AIOSequentialFile
+ *
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ *
+ */
+public class AIOSequentialFile extends AbstractSequentialFile implements 
IOExceptionListener
+{
+   private boolean opened = false;
+
+   private final int maxIO;
+
+   private AsynchronousFile aioFile;
+
+   private final BufferCallback bufferCallback;
+
+   /** The pool for Thread pollers */
+   private final Executor pollerExecutor;
+
+   public AIOSequentialFile(final SequentialFileFactory factory,
+                            final int bufferSize,
+                            final long bufferTimeoutMilliseconds,
+                            final String directory,
+                            final String fileName,
+                            final int maxIO,
+                            final BufferCallback bufferCallback,
+                            final Executor writerExecutor,
+                            final Executor pollerExecutor)
+   {
+      super(directory, new File(directory + "/" + fileName), factory, 
writerExecutor);
+      this.maxIO = maxIO;
+      this.bufferCallback = bufferCallback;
+      this.pollerExecutor = pollerExecutor;
+   }
+
+   public boolean isOpen()
+   {
+      return opened;
+   }
+
+   public int getAlignment()
+   {
+      checkOpened();
+
+      return aioFile.getBlockSize();
+   }
+
+   public int calculateBlockStart(final int position)
+   {
+      int alignment = getAlignment();
+
+      int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * 
alignment;
+
+      return pos;
+   }
+
+   public SequentialFile cloneFile()
+   {
+      return new AIOSequentialFile(factory,
+                                   -1,
+                                   -1,
+                                   getFile().getParent(),
+                                   getFileName(),
+                                   maxIO,
+                                   bufferCallback,
+                                   writerExecutor,
+                                   pollerExecutor);
+   }
+
+   @Override
+   public synchronized void close() throws IOException, InterruptedException, 
HornetQException
+   {
+      if (!opened)
+      {
+         return;
+      }
+
+      super.close();
+
+      opened = false;
+
+      timedBuffer = null;
+
+      aioFile.close();
+      aioFile = null;
+
+      notifyAll();
+   }
+
+   @Override
+   public synchronized void waitForClose() throws Exception
+   {
+      while (isOpen())
+      {
+         wait();
+      }
+   }
+
+   public void fill(final int position, final int size, final byte 
fillCharacter) throws Exception
+   {
+      checkOpened();
+
+      int fileblockSize = aioFile.getBlockSize();
+
+      int blockSize = fileblockSize;
+
+      if (size % (100 * 1024 * 1024) == 0)
+      {
+         blockSize = 100 * 1024 * 1024;
+      }
+      else if (size % (10 * 1024 * 1024) == 0)
+      {
+         blockSize = 10 * 1024 * 1024;
+      }
+      else if (size % (1024 * 1024) == 0)
+      {
+         blockSize = 1024 * 1024;
+      }
+      else if (size % (10 * 1024) == 0)
+      {
+         blockSize = 10 * 1024;
+      }
+      else
+      {
+         blockSize = fileblockSize;
+      }
+
+      int blocks = size / blockSize;
+
+      if (size % blockSize != 0)
+      {
+         blocks++;
+      }
+
+      int filePosition = position;
+
+      if (position % fileblockSize != 0)
+      {
+         filePosition = (position / fileblockSize + 1) * fileblockSize;
+      }
+
+      aioFile.fill(filePosition, blocks, blockSize, fillCharacter);
+
+      fileSize = aioFile.size();
+   }
+
+   public void open() throws Exception
+   {
+      open(maxIO, true);
+   }
+
+   public synchronized void open(final int maxIO, final boolean useExecutor) 
throws HornetQException
+   {
+      opened = true;
+
+      aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null, 
pollerExecutor, this);
+
+      try
+      {
+         aioFile.open(getFile().getAbsolutePath(), maxIO);
+      }
+      catch (HornetQException e)
+      {
+         factory.onIOError(e, e.getMessage(), this);
+         throw e;
+      }
+
+      position.set(0);
+
+      aioFile.setBufferCallback(bufferCallback);
+
+      fileSize = aioFile.size();
+   }
+
+   public void setBufferCallback(final BufferCallback callback)
+   {
+      aioFile.setBufferCallback(callback);
+   }
+
+   public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws 
HornetQException
+   {
+      int bytesToRead = bytes.limit();
+
+      long positionToRead = position.getAndAdd(bytesToRead);
+
+      bytes.rewind();
+
+      aioFile.read(positionToRead, bytesToRead, bytes, callback);
+
+      return bytesToRead;
+   }
+
+   public int read(final ByteBuffer bytes) throws Exception
+   {
+      SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
+
+      int bytesRead = read(bytes, waitCompletion);
+
+      waitCompletion.waitCompletion();
+
+      return bytesRead;
+   }
+
+   public void sync()
+   {
+      throw new UnsupportedOperationException("This method is not supported on 
AIO");
+   }
+
+   public long size() throws Exception
+   {
+      if (aioFile == null)
+      {
+         return getFile().length();
+      }
+      else
+      {
+         return aioFile.size();
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      return "AIOSequentialFile:" + getFile().getAbsolutePath();
+   }
+
+   // Public methods
+   // 
-----------------------------------------------------------------------------------------------------
+
+   @Override
+   public void onIOException(Exception code, String message)
+   {
+      factory.onIOError(code, message, this);
+   }
+
+
+   public void writeDirect(final ByteBuffer bytes, final boolean sync) throws 
Exception
+   {
+      if (sync)
+      {
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+         writeDirect(bytes, true, completion);
+
+         completion.waitCompletion();
+      }
+      else
+      {
+         writeDirect(bytes, false, DummyCallback.getInstance());
+      }
+   }
+
+   /**
+    *
+    * @param sync Not used on AIO
+    *  */
+   public void writeDirect(final ByteBuffer bytes, final boolean sync, final 
IOAsyncTask callback)
+   {
+      final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+
+      final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+      aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
+   }
+
+   public void writeInternal(final ByteBuffer bytes) throws HornetQException
+   {
+      final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+
+      final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+      aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
+   }
+
+   // Protected methods
+   // 
-----------------------------------------------------------------------------------------------------
+
+   @Override
+   protected ByteBuffer newBuffer(int size, int limit)
+   {
+      size = factory.calculateBlockSize(size);
+      limit = factory.calculateBlockSize(limit);
+
+      ByteBuffer buffer = factory.newBuffer(size);
+      buffer.limit(limit);
+      return buffer;
+   }
+
+   // Private methods
+   // 
-----------------------------------------------------------------------------------------------------
+
+   private void checkOpened()
+   {
+      if (aioFile == null || !opened)
+      {
+         throw new IllegalStateException("File not opened");
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFileFactory.java
 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFileFactory.java
new file mode 100644
index 0000000..eb46fd3
--- /dev/null
+++ 
b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AIOSequentialFileFactory.java
@@ -0,0 +1,358 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.core.asyncio.BufferCallback;
+import org.apache.activemq6.core.asyncio.impl.AsynchronousFileImpl;
+import org.apache.activemq6.core.journal.IOCriticalErrorListener;
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.libaio.Native;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+import org.apache.activemq6.utils.HornetQThreadFactory;
+
+/**
+ * A AIOSequentialFileFactory
+ *
+ * @author [email protected]
+ */
+public final class AIOSequentialFileFactory extends 
AbstractSequentialFileFactory
+{
+   private static final boolean trace = 
HornetQJournalLogger.LOGGER.isTraceEnabled();
+
+   private final ReuseBuffersController buffersControl = new 
ReuseBuffersController();
+
+   private ExecutorService pollerExecutor;
+
+   // This method exists just to make debug easier.
+   // I could replace log.trace by log.info temporarily while I was debugging
+   // Journal
+   private static void trace(final String message)
+   {
+      HornetQJournalLogger.LOGGER.trace(message);
+   }
+
+   public AIOSequentialFileFactory(final String journalDir)
+   {
+      this(journalDir,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
+           false,
+           null);
+   }
+
+   public AIOSequentialFileFactory(final String journalDir, final 
IOCriticalErrorListener listener)
+   {
+      this(journalDir,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
+           false,
+           listener);
+   }
+
+   public AIOSequentialFileFactory(final String journalDir,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final boolean logRates)
+   {
+      this(journalDir, bufferSize, bufferTimeout, logRates, null);
+   }
+
+   public AIOSequentialFileFactory(final String journalDir,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final boolean logRates,
+                                   final IOCriticalErrorListener listener)
+   {
+      super(journalDir, true, bufferSize, bufferTimeout, logRates, listener);
+   }
+
+   public SequentialFile createSequentialFile(final String fileName, final int 
maxIO)
+   {
+      return new AIOSequentialFile(this,
+                                   bufferSize,
+                                   bufferTimeout,
+                                   journalDir,
+                                   fileName,
+                                   maxIO,
+                                   buffersControl.callback,
+                                   writeExecutor,
+                                   pollerExecutor);
+   }
+
+   public boolean isSupportsCallbacks()
+   {
+      return true;
+   }
+
+   public static boolean isSupported()
+   {
+      return AsynchronousFileImpl.isLoaded();
+   }
+
+   public ByteBuffer allocateDirectBuffer(final int size)
+   {
+
+      int blocks = size / 512;
+      if (size % 512 != 0)
+      {
+         blocks++;
+      }
+
+      // The buffer on AIO has to be a multiple of 512
+      ByteBuffer buffer = AsynchronousFileImpl.newBuffer(blocks * 512);
+
+      buffer.limit(size);
+
+      return buffer;
+   }
+
+   public void releaseDirectBuffer(final ByteBuffer buffer)
+   {
+      Native.destroyBuffer(buffer);
+   }
+
+   public ByteBuffer newBuffer(int size)
+   {
+      if (size % 512 != 0)
+      {
+         size = (size / 512 + 1) * 512;
+      }
+
+      return buffersControl.newBuffer(size);
+   }
+
+   public void clearBuffer(final ByteBuffer directByteBuffer)
+   {
+      AsynchronousFileImpl.clearBuffer(directByteBuffer);
+   }
+
+   public int getAlignment()
+   {
+      return 512;
+   }
+
+   // For tests only
+   public ByteBuffer wrapBuffer(final byte[] bytes)
+   {
+      ByteBuffer newbuffer = newBuffer(bytes.length);
+      newbuffer.put(bytes);
+      return newbuffer;
+   }
+
+   public int calculateBlockSize(final int position)
+   {
+      int alignment = getAlignment();
+
+      int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * 
alignment;
+
+      return pos;
+   }
+
+   /* (non-Javadoc)
+    * @see 
org.apache.activemq6.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+    */
+   @Override
+   public synchronized void releaseBuffer(final ByteBuffer buffer)
+   {
+      Native.destroyBuffer(buffer);
+   }
+
+   @Override
+   public void start()
+   {
+      super.start();
+
+      pollerExecutor = Executors.newCachedThreadPool(new 
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
+                                                                              
true,
+                                                                              
AIOSequentialFileFactory.getThisClassLoader()));
+
+   }
+
+   @Override
+   public void stop()
+   {
+      buffersControl.stop();
+
+      if (pollerExecutor != null)
+      {
+         pollerExecutor.shutdown();
+
+         try
+         {
+            if 
(!pollerExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT,
 TimeUnit.SECONDS))
+            {
+               HornetQJournalLogger.LOGGER.timeoutOnPollerShutdown(new 
Exception("trace"));
+            }
+         }
+         catch (InterruptedException e)
+         {
+            throw new HornetQInterruptedException(e);
+         }
+      }
+
+      super.stop();
+   }
+
+   @Override
+   protected void finalize()
+   {
+      stop();
+   }
+
+   /**
+    * Class that will control buffer-reuse
+    */
+   private class ReuseBuffersController
+   {
+      private volatile long bufferReuseLastTime = System.currentTimeMillis();
+
+      /**
+       * This queue is fed by {@link 
org.apache.activemq6.core.journal.impl.AIOSequentialFileFactory.ReuseBuffersController.LocalBufferCallback}
+       * which is called directly by NIO or NIO. On the case of the AIO this 
is almost called by the native layer as
+       * soon as the buffer is not being used any more and ready to be reused 
or GCed
+       */
+      private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new 
ConcurrentLinkedQueue<ByteBuffer>();
+
+      private boolean stopped = false;
+
+      final BufferCallback callback = new LocalBufferCallback();
+
+      public ByteBuffer newBuffer(final int size)
+      {
+         // if a new buffer wasn't requested in 10 seconds, we clear the queue
+         // This is being done this way as we don't need another Timeout Thread
+         // just to cleanup this
+         if (bufferSize > 0 && System.currentTimeMillis() - 
bufferReuseLastTime > 10000)
+         {
+            if (AIOSequentialFileFactory.trace)
+            {
+               AIOSequentialFileFactory.trace("Clearing reuse buffers queue 
with " + reuseBuffersQueue.size() +
+                                                 " elements");
+            }
+
+            bufferReuseLastTime = System.currentTimeMillis();
+
+            clearPoll();
+         }
+
+         // if a buffer is bigger than the configured-bufferSize, we just 
create a new
+         // buffer.
+         if (size > bufferSize)
+         {
+            return AsynchronousFileImpl.newBuffer(size);
+         }
+         else
+         {
+            // We need to allocate buffers following the rules of the storage
+            // being used (AIO/NIO)
+            int alignedSize = calculateBlockSize(size);
+
+            // Try getting a buffer from the queue...
+            ByteBuffer buffer = reuseBuffersQueue.poll();
+
+            if (buffer == null)
+            {
+               // if empty create a new one.
+               buffer = AsynchronousFileImpl.newBuffer(bufferSize);
+
+               buffer.limit(alignedSize);
+            }
+            else
+            {
+               clearBuffer(buffer);
+
+               // set the limit of the buffer to the bufferSize being required
+               buffer.limit(alignedSize);
+            }
+
+            buffer.rewind();
+
+            return buffer;
+         }
+      }
+
+      public synchronized void stop()
+      {
+         stopped = true;
+         clearPoll();
+      }
+
+      public synchronized void clearPoll()
+      {
+         ByteBuffer reusedBuffer;
+
+         while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
+         {
+            releaseBuffer(reusedBuffer);
+         }
+      }
+
+      private class LocalBufferCallback implements BufferCallback
+      {
+         public void bufferDone(final ByteBuffer buffer)
+         {
+            synchronized (ReuseBuffersController.this)
+            {
+
+               if (stopped)
+               {
+                  releaseBuffer(buffer);
+               }
+               else
+               {
+                  bufferReuseLastTime = System.currentTimeMillis();
+
+                  // If a buffer has any other than the configured bufferSize, 
the buffer
+                  // will be just sent to GC
+                  if (buffer.capacity() == bufferSize)
+                  {
+                     reuseBuffersQueue.offer(buffer);
+                  }
+                  else
+                  {
+                     releaseBuffer(buffer);
+                  }
+               }
+            }
+         }
+      }
+   }
+
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return AIOSequentialFileFactory.class.getClassLoader();
+         }
+      });
+
+   }
+
+   @Override
+   public String toString()
+   {
+      return AIOSequentialFileFactory.class.getSimpleName() + 
"(buffersControl.stopped=" + buffersControl.stopped +
+         "):" + super.toString();
+   }
+}

Reply via email to