Repository: activemq-artemis Updated Branches: refs/heads/master 36c965927 -> 6984d39a9
ARTEMIS-1151 Adapting TimedBuffer and NIO Buffer Pooling - NIO/ASYNCIO new TimedBuffer with adapting batch window heuristic - NIO/ASYNCIO improved TimedBuffer write monitoring with lightweight concurrent performance counters - NIO/ASYNCIO journal/paging operations benefit from less buffer copy - NIO/ASYNCIO any buffer copy is always performed with raw batch copy using SIMD instrinsics (System::arrayCopy) or memcpy under the hood - NIO improved clear buffers using SIMD instrinsics (Arrays::fill) and/or memset - NIO journal operation perform by default TLABs allocation pooling (off heap) retaining only the last max sized buffer - NIO improved file copy operations using zero-copy FileChannel::transfertTo - NIO improved zeroing using pooled single OS page buffer to clean the file + pwrite (on Linux) - NIO deterministic release of unpooled direct buffers to avoid OOM errors due to slow GC - Exposed OS PAGE SIZE value using Env class Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/21c9ed85 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/21c9ed85 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/21c9ed85 Branch: refs/heads/master Commit: 21c9ed85cf6b9a53debdd32747bd42b2e733da80 Parents: 36c9659 Author: Francesco Nigro <[email protected]> Authored: Tue May 2 11:47:44 2017 +0200 Committer: Clebert Suconic <[email protected]> Committed: Mon May 8 11:55:28 2017 -0400 ---------------------------------------------------------------------- .../cli/commands/util/SyncCalculation.java | 1 + .../org/apache/activemq/artemis/utils/Env.java | 47 +++- .../artemis/core/io/AbstractSequentialFile.java | 30 +-- .../artemis/core/io/buffer/TimedBuffer.java | 252 +++++++++---------- .../artemis/core/io/nio/NIOSequentialFile.java | 105 +++++--- .../core/io/nio/NIOSequentialFileFactory.java | 82 +++++- .../unit/core/journal/impl/TimedBufferTest.java | 160 ------------ 7 files changed, 323 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21c9ed85/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java index a5e38d7..25c8b27 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java @@ -181,6 +181,7 @@ public class SyncCalculation { case NIO: factory = new NIOSequentialFileFactory(datafolder, 1).setDatasync(datasync); + ((NIOSequentialFileFactory) factory).disableBufferReuse(); factory.start(); return factory; case ASYNCIO: http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21c9ed85/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java index d7fc48a..94f69d3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java @@ -17,18 +17,47 @@ package org.apache.activemq.artemis.utils; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; + /** * Utility that detects various properties specific to the current runtime * environment, such as JVM bitness and OS type. */ public final class Env { - /** The system will change a few logs and semantics to be suitable to - * run a long testsuite. - * Like a few log entries that are only valid during a production system. - * or a few cases we need to know as warn on the testsuite and as log in production. */ - private static boolean testEnv = false; + private static final int OS_PAGE_SIZE; + static { + //most common OS page size value + int osPageSize = 4096; + sun.misc.Unsafe instance; + try { + Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + instance = (sun.misc.Unsafe) field.get((Object) null); + } catch (Throwable t) { + try { + Constructor<sun.misc.Unsafe> c = sun.misc.Unsafe.class.getDeclaredConstructor(new Class[0]); + c.setAccessible(true); + instance = c.newInstance(new Object[0]); + } catch (Throwable t1) { + instance = null; + } + } + if (instance != null) { + osPageSize = instance.pageSize(); + } + OS_PAGE_SIZE = osPageSize; + } + + /** + * The system will change a few logs and semantics to be suitable to + * run a long testsuite. + * Like a few log entries that are only valid during a production system. + * or a few cases we need to know as warn on the testsuite and as log in production. + */ + private static boolean testEnv = false; private static final String OS = System.getProperty("os.name").toLowerCase(); private static final boolean IS_LINUX = OS.startsWith("linux"); @@ -38,6 +67,14 @@ public final class Env { } + /** + * Return the size in bytes of a OS memory page. + * This value will always be a power of two. + */ + public static int osPageSize() { + return OS_PAGE_SIZE; + } + public static boolean isTestEnv() { return testEnv; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21c9ed85/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java index cd15246..f6cb9b0 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -189,9 +189,12 @@ public abstract class AbstractSequentialFile implements SequentialFile { bytes.setIndex(0, bytes.capacity()); timedBuffer.addBytes(bytes, sync, callback); } else { - ByteBuffer buffer = factory.newBuffer(bytes.capacity()); - buffer.put(bytes.toByteBuffer().array()); - buffer.rewind(); + final int readableBytes = bytes.readableBytes(); + final ByteBuffer buffer = factory.newBuffer(readableBytes); + //factory::newBuffer doesn't necessary return a buffer with limit == readableBytes!! + buffer.limit(readableBytes); + bytes.getBytes(bytes.readerIndex(), buffer); + buffer.flip(); writeDirect(buffer, sync, callback); } } @@ -215,15 +218,12 @@ public abstract class AbstractSequentialFile implements SequentialFile { if (timedBuffer != null) { timedBuffer.addBytes(bytes, sync, callback); } else { - ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize()); - - // If not using the TimedBuffer, a final copy is necessary - // Because AIO will need a specific Buffer - // And NIO will also need a whole buffer to perform the write - + final int encodedSize = bytes.getEncodeSize(); + ByteBuffer buffer = factory.newBuffer(encodedSize); ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer); bytes.encode(outBuffer); - buffer.rewind(); + buffer.clear(); + buffer.limit(encodedSize); writeDirect(buffer, sync, callback); } } @@ -255,9 +255,10 @@ public abstract class AbstractSequentialFile implements SequentialFile { @Override public void done() { - for (IOCallback callback : delegates) { + final int size = delegates.size(); + for (int i = 0; i < size; i++) { try { - callback.done(); + delegates.get(i).done(); } catch (Throwable e) { ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e); } @@ -266,9 +267,10 @@ public abstract class AbstractSequentialFile implements SequentialFile { @Override public void onError(final int errorCode, final String errorMessage) { - for (IOCallback callback : delegates) { + final int size = delegates.size(); + for (int i = 0; i < size; i++) { try { - callback.onError(errorCode, errorMessage); + delegates.get(i).onError(errorCode, errorMessage); } catch (Throwable e) { ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21c9ed85/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index 08415b8..ca9a315 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -18,27 +18,25 @@ package org.apache.activemq.artemis.core.io.buffer; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; -public class TimedBuffer { +public final class TimedBuffer { // Constants ----------------------------------------------------- - // The number of tries on sleep before switching to spin - public static final int MAX_CHECKS_ON_SLEEP = 20; - // Attributes ---------------------------------------------------- private TimedBufferObserver bufferObserver; @@ -58,10 +56,9 @@ public class TimedBuffer { private List<IOCallback> callbacks; - private volatile int timeout; + private final int timeout; - // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen - private volatile boolean pendingSync = false; + private final AtomicLong pendingSyncs = new AtomicLong(); private Thread timerThread; @@ -76,7 +73,7 @@ public class TimedBuffer { private final boolean logRates; - private final AtomicLong bytesFlushed = new AtomicLong(0); + private long bytesFlushed = 0; private final AtomicLong flushesDone = new AtomicLong(0); @@ -84,8 +81,6 @@ public class TimedBuffer { private TimerTask logRatesTimerTask; - private boolean useSleep = true; - // no need to be volatile as every access is synchronized private boolean spinning = false; @@ -104,27 +99,18 @@ public class TimedBuffer { logRatesTimer = new Timer(true); } // Setting the interval for nano-sleeps - - buffer = ActiveMQBuffers.fixedBuffer(bufferSize); + //prefer off heap buffer to allow further humongous allocations and reduce GC overhead + buffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size)); buffer.clear(); bufferLimit = 0; - callbacks = new ArrayList<>(); + callbacks = null; this.timeout = timeout; } - // for Debug purposes - public synchronized boolean isUseSleep() { - return useSleep; - } - - public synchronized void setUseSleep(boolean useSleep) { - this.useSleep = useSleep; - } - public synchronized void start() { if (started) { return; @@ -232,7 +218,28 @@ public class TimedBuffer { } public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) { - addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback); + if (!started) { + throw new IllegalStateException("TimedBuffer is not started"); + } + + delayFlush = false; + + //it doesn't modify the reader index of bytes as in the original version + final int readableBytes = bytes.readableBytes(); + final int writerIndex = buffer.writerIndex(); + buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes); + buffer.writerIndex(writerIndex + readableBytes); + + if (callbacks == null) { + callbacks = new ArrayList<>(); + } + callbacks.add(callback); + + if (sync) { + final long currentPendingSyncs = pendingSyncs.get(); + pendingSyncs.lazySet(currentPendingSyncs + 1); + startSpin(); + } } public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) { @@ -244,11 +251,14 @@ public class TimedBuffer { bytes.encode(buffer); + if (callbacks == null) { + callbacks = new ArrayList<>(); + } callbacks.add(callback); if (sync) { - pendingSync = true; - + final long currentPendingSyncs = pendingSyncs.get(); + pendingSyncs.lazySet(currentPendingSyncs + 1); startSpin(); } @@ -262,45 +272,49 @@ public class TimedBuffer { * force means the Journal is moving to a new file. Any pending write need to be done immediately * or data could be lost */ - public void flush(final boolean force) { + private void flush(final boolean force) { synchronized (this) { if (!started) { throw new IllegalStateException("TimedBuffer is not started"); } if ((force || !delayFlush) && buffer.writerIndex() > 0) { - int pos = buffer.writerIndex(); + final int pos = buffer.writerIndex(); - if (logRates) { - bytesFlushed.addAndGet(pos); - } + final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); + //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!! + bufferToFlush.limit(pos); + //perform memcpy under the hood due to the off heap buffer + buffer.getBytes(0, bufferToFlush); - ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); - - // Putting a byteArray on a native buffer is much faster, since it will do in a single native call. - // Using bufferToFlush.put(buffer) would make several append calls for each byte - // We also transfer the content of this buffer to the native file's buffer - - bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos); - - bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks); + final List<IOCallback> ioCallbacks = callbacks == null ? Collections.emptyList() : callbacks; + bufferObserver.flushBuffer(bufferToFlush, pendingSyncs.get() > 0, ioCallbacks); stopSpin(); - pendingSync = false; + pendingSyncs.lazySet(0); - // swap the instance as the previous callback list is being used asynchronously - callbacks = new LinkedList<>(); + callbacks = null; buffer.clear(); bufferLimit = 0; - flushesDone.incrementAndGet(); + if (logRates) { + logFlushed(pos); + } } } } + private void logFlushed(int bytes) { + this.bytesFlushed += bytes; + //more lightweight than XADD if single writer + final long currentFlushesDone = flushesDone.get(); + //flushesDone::lazySet write-Release bytesFlushed + flushesDone.lazySet(currentFlushesDone + 1L); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -324,21 +338,21 @@ public class TimedBuffer { if (!closed) { long now = System.currentTimeMillis(); - long bytesF = bytesFlushed.get(); - long flushesD = flushesDone.get(); - + final long flushesDone = TimedBuffer.this.flushesDone.get(); + //flushesDone::get read-Acquire bytesFlushed + final long bytesFlushed = TimedBuffer.this.bytesFlushed; if (lastExecution != 0) { - double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution); + final double rate = 1000 * (double) (bytesFlushed - lastBytesFlushed) / (now - lastExecution); ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024))); - double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution); + final double flushRate = 1000 * (double) (flushesDone - lastFlushesDone) / (now - lastExecution); ActiveMQJournalLogger.LOGGER.flushRate(flushRate); } lastExecution = now; - lastBytesFlushed = bytesF; + lastBytesFlushed = bytesFlushed; - lastFlushesDone = flushesD; + lastFlushesDone = flushesDone; } } @@ -354,84 +368,40 @@ public class TimedBuffer { private volatile boolean closed = false; - int checks = 0; - int failedChecks = 0; - long timeBefore = 0; - - final int sleepMillis = timeout / 1000000; // truncates - final int sleepNanos = timeout % 1000000; - @Override public void run() { + int waitTimes = 0; long lastFlushTime = 0; + long estimatedOptimalBatch = Runtime.getRuntime().availableProcessors(); + final Semaphore spinLimiter = TimedBuffer.this.spinLimiter; + final long timeout = TimedBuffer.this.timeout; while (!closed) { - // We flush on the timer if there are pending syncs there and we've waited at least one - // timeout since the time of the last flush. - // Effectively flushing "resets" the timer - // On the timeout verification, notice that we ignore the timeout check if we are using sleep - - if (pendingSync) { - if (isUseSleep()) { - // if using sleep, we will always flush - flush(); - lastFlushTime = System.nanoTime(); - } else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) { - // if not using flush we will spin and do the time checks manually - flush(); - lastFlushTime = System.nanoTime(); + boolean flushed = false; + final long currentPendingSyncs = pendingSyncs.get(); + + if (currentPendingSyncs > 0) { + if (bufferObserver != null) { + final boolean checkpoint = System.nanoTime() > lastFlushTime + timeout; + if (checkpoint || currentPendingSyncs >= estimatedOptimalBatch) { + flush(); + if (checkpoint) { + estimatedOptimalBatch = currentPendingSyncs; + } else { + estimatedOptimalBatch = Math.max(estimatedOptimalBatch, currentPendingSyncs); + } + lastFlushTime = System.nanoTime(); + //a flush has been requested + flushed = true; + } } - } - sleepIfPossible(); - - try { - spinLimiter.acquire(); - - Thread.yield(); - - spinLimiter.release(); - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } - } - } - - /** - * We will attempt to use sleep only if the system supports nano-sleep - * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well. - * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin - */ - private void sleepIfPossible() { - if (isUseSleep()) { - if (checks < MAX_CHECKS_ON_SLEEP) { - timeBefore = System.nanoTime(); - } - - try { - sleep(sleepMillis, sleepNanos); - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } catch (Exception e) { - setUseSleep(false); - ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e); - } - - if (checks < MAX_CHECKS_ON_SLEEP) { - long realTimeSleep = System.nanoTime() - timeBefore; - - // I'm letting the real time to be up to 50% than the requested sleep. - if (realTimeSleep > timeout * 1.5) { - failedChecks++; - } - - if (++checks >= MAX_CHECKS_ON_SLEEP) { - if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) { - ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts"); - setUseSleep(false); - } - } + if (flushed) { + waitTimes = 0; + } else { + //instead of interruptible sleeping, perform progressive parks depending on the load + waitTimes = TimedBuffer.wait(waitTimes, spinLimiter); } } } @@ -441,15 +411,33 @@ public class TimedBuffer { } } - /** - * Sub classes (tests basically) can use this to override how the sleep is being done - * - * @param sleepMillis - * @param sleepNanos - * @throws InterruptedException - */ - protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException { - Thread.sleep(sleepMillis, sleepNanos); + private static int wait(int waitTimes, Semaphore spinLimiter) { + if (waitTimes < 10) { + //doesn't make sense to spin loop here, because of the lock around flush/addBytes operations! + Thread.yield(); + waitTimes++; + } else if (waitTimes < 20) { + LockSupport.parkNanos(1L); + waitTimes++; + } else if (waitTimes < 50) { + LockSupport.parkNanos(10L); + waitTimes++; + } else if (waitTimes < 100) { + LockSupport.parkNanos(100L); + waitTimes++; + } else if (waitTimes < 1000) { + LockSupport.parkNanos(1000L); + waitTimes++; + } else { + LockSupport.parkNanos(100_000L); + try { + spinLimiter.acquire(); + spinLimiter.release(); + } catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + } + return waitTimes; } /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21c9ed85/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 29e5b81..d1e333e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -22,6 +22,7 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -33,6 +34,8 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.Env; public final class NIOSequentialFile extends AbstractSequentialFile { @@ -40,9 +43,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { private RandomAccessFile rfile; - private final int defaultMaxIO; - - private int maxIO; + private final int maxIO; public NIOSequentialFile(final SequentialFileFactory factory, final File directory, @@ -50,7 +51,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { final int maxIO, final Executor writerExecutor) { super(directory, file, factory, writerExecutor); - defaultMaxIO = maxIO; + this.maxIO = maxIO; } @Override @@ -69,7 +70,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { */ @Override public synchronized void open() throws IOException { - open(defaultMaxIO, true); + open(maxIO, true); } @Override @@ -90,31 +91,38 @@ public final class NIOSequentialFile extends AbstractSequentialFile { @Override public void fill(final int size) throws IOException { - ByteBuffer bb = ByteBuffer.allocate(size); - - bb.limit(size); - bb.position(0); - try { - channel.position(0); - channel.write(bb); - channel.force(false); - channel.position(0); + //uses the most common OS page size to match the Page Cache entry size and reduce JVM memory footprint + final int zeroPageCapacity = Env.osPageSize(); + final ByteBuffer zeroPage = this.factory.newBuffer(zeroPageCapacity); + try { + int bytesToWrite = size; + long writePosition = 0; + while (bytesToWrite > 0) { + zeroPage.clear(); + final int zeroPageLimit = Math.min(bytesToWrite, zeroPageCapacity); + zeroPage.limit(zeroPageLimit); + //use the cheaper pwrite instead of fseek + fwrite + final int writtenBytes = channel.write(zeroPage, writePosition); + bytesToWrite -= writtenBytes; + writePosition += writtenBytes; + } + if (factory.isDatasync()) { + channel.force(true); + } + //set the position to 0 to match the fill contract + channel.position(0); + fileSize = size; + } finally { + //return it to the factory + this.factory.releaseBuffer(zeroPage); + } } catch (ClosedChannelException e) { throw e; } catch (IOException e) { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; } - channel.force(true); - - fileSize = channel.size(); - } - - public synchronized void waitForClose() throws InterruptedException { - while (isOpen()) { - wait(); - } } @Override @@ -247,10 +255,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile { internalWrite(bytes, sync, null); } - public void writeInternal(final ByteBuffer bytes) throws Exception { - internalWrite(bytes, true, null); - } - @Override protected ByteBuffer newBuffer(int size, final int limit) { // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO @@ -293,14 +297,51 @@ public final class NIOSequentialFile extends AbstractSequentialFile { private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws IOException { - channel.write(bytes); + try { + channel.write(bytes); - if (sync) { - sync(); + if (sync) { + sync(); + } + + if (callback != null) { + callback.done(); + } + } finally { + //release it to recycle the write buffer if big enough + this.factory.releaseBuffer(bytes); } + } - if (callback != null) { - callback.done(); + @Override + public void copyTo(SequentialFile dstFile) throws IOException { + if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) { + ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + dstFile); + } + if (isOpen()) { + throw new IllegalStateException("File opened!"); + } + if (dstFile.isOpen()) { + throw new IllegalArgumentException("dstFile must be closed too"); + } + try (RandomAccessFile src = new RandomAccessFile(getFile(), "rw"); + FileChannel srcChannel = src.getChannel(); + FileLock srcLock = srcChannel.lock()) { + final long readableBytes = srcChannel.size(); + if (readableBytes > 0) { + try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw"); + FileChannel dstChannel = dst.getChannel(); + FileLock dstLock = dstChannel.lock()) { + final long oldLength = dst.length(); + final long newLength = oldLength + readableBytes; + dst.setLength(newLength); + final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes); + if (transferred != readableBytes) { + dstChannel.truncate(oldLength); + throw new IOException("copied less then expected"); + } + } + } } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21c9ed85/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index f90bebf..781176e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -19,13 +19,23 @@ package org.apache.activemq.artemis.core.io.nio; import java.io.File; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; +import java.util.Arrays; +import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.utils.Env; -public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { +public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory { + + private static final int DEFAULT_CAPACITY_ALIGNMENT = Env.osPageSize(); + + private boolean bufferPooling; + + //pools only the biggest one -> optimized for the common case + private final ThreadLocal<ByteBuffer> bytesPool; public NIOSequentialFileFactory(final File journalDir, final int maxIO) { this(journalDir, null, maxIO); @@ -63,6 +73,8 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { final boolean logRates, final IOCriticalErrorListener listener) { super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener); + this.bufferPooling = true; + this.bytesPool = new ThreadLocal<>(); } public static ByteBuffer allocateDirectByteBuffer(final int size) { @@ -91,6 +103,14 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { return buffer2; } + public void enableBufferReuse() { + this.bufferPooling = true; + } + + public void disableBufferReuse() { + this.bufferPooling = false; + } + @Override public SequentialFile createSequentialFile(final String fileName) { return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); @@ -101,31 +121,71 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { return timedBuffer != null; } + private static int align(final int value, final int pow2alignment) { + return (value + (pow2alignment - 1)) & ~(pow2alignment - 1); + } + @Override public ByteBuffer allocateDirectBuffer(final int size) { - return NIOSequentialFileFactory.allocateDirectByteBuffer(size); + final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT); + final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); + byteBuffer.limit(size); + return byteBuffer; } @Override public void releaseDirectBuffer(ByteBuffer buffer) { - // nothing we can do on this case. we can just have good faith on GC + PlatformDependent.freeDirectBuffer(buffer); } @Override public ByteBuffer newBuffer(final int size) { - return ByteBuffer.allocate(size); + if (!this.bufferPooling) { + return allocateDirectBuffer(size); + } else { + final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT); + ByteBuffer byteBuffer = bytesPool.get(); + if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) { + //do not free the old one (if any) until the new one will be released into the pool! + byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); + } else { + bytesPool.set(null); + PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0); + byteBuffer.clear(); + } + byteBuffer.limit(size); + return byteBuffer; + } } @Override - public void clearBuffer(final ByteBuffer buffer) { - final int limit = buffer.limit(); - buffer.rewind(); - - for (int i = 0; i < limit; i++) { - buffer.put((byte) 0); + public void releaseBuffer(ByteBuffer buffer) { + if (this.bufferPooling) { + if (buffer.isDirect()) { + final ByteBuffer byteBuffer = bytesPool.get(); + if (byteBuffer != buffer) { + //replace with the current pooled only if greater or null + if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) { + if (byteBuffer != null) { + //free the smaller one + PlatformDependent.freeDirectBuffer(byteBuffer); + } + bytesPool.set(buffer); + } else { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } } + } - buffer.rewind(); + @Override + public void clearBuffer(final ByteBuffer buffer) { + if (buffer.isDirect()) { + PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer), buffer.limit(), (byte) 0); + } else { + Arrays.fill(buffer.array(), buffer.arrayOffset(), buffer.limit(), (byte) 0); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21c9ed85/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java index 31cb970..b2f65cd 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java @@ -19,8 +19,6 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -205,162 +203,4 @@ public class TimedBufferTest extends ActiveMQTestBase { } } - - /** - * This test will verify if the system will switch to spin case the system can't perform sleeps timely - * due to proper kernel installations - * - * @throws Exception - */ - @Test - public void testVerifySwitchToSpin() throws Exception { - class TestObserver implements TimedBufferObserver { - - @Override - public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) { - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int) - */ - @Override - public ByteBuffer newBuffer(final int minSize, final int maxSize) { - return ByteBuffer.allocate(maxSize); - } - - @Override - public int getRemainingBytes() { - return 1024 * 1024; - } - } - - final CountDownLatch sleptLatch = new CountDownLatch(1); - - TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 1000, false) { - - @Override - protected void stopSpin() { - // keeps spinning forever - } - - @Override - protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException { - Thread.sleep(10); - } - - @Override - public synchronized void setUseSleep(boolean param) { - super.setUseSleep(param); - sleptLatch.countDown(); - } - - }; - - timedBuffer.start(); - - try { - - timedBuffer.setObserver(new TestObserver()); - - int x = 0; - - byte[] bytes = new byte[10]; - for (int j = 0; j < 10; j++) { - bytes[j] = ActiveMQTestBase.getSamplebyte(x++); - } - - ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes); - - timedBuffer.checkSize(10); - timedBuffer.addBytes(buff, true, dummyCallback); - - sleptLatch.await(10, TimeUnit.SECONDS); - - assertFalse(timedBuffer.isUseSleep()); - } finally { - timedBuffer.stop(); - } - - } - - /** - * This test will verify if the system will switch to spin case the system can't perform sleeps timely - * due to proper kernel installations - * - * @throws Exception - */ - @Test - public void testStillSleeps() throws Exception { - class TestObserver implements TimedBufferObserver { - - @Override - public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) { - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int) - */ - @Override - public ByteBuffer newBuffer(final int minSize, final int maxSize) { - return ByteBuffer.allocate(maxSize); - } - - @Override - public int getRemainingBytes() { - return 1024 * 1024; - } - } - - final CountDownLatch sleptLatch = new CountDownLatch(TimedBuffer.MAX_CHECKS_ON_SLEEP); - - TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 1000, false) { - - @Override - protected void stopSpin() { - // keeps spinning forever - } - - @Override - protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException { - sleptLatch.countDown(); - // no sleep - } - - @Override - public synchronized void setUseSleep(boolean param) { - super.setUseSleep(param); - sleptLatch.countDown(); - } - - }; - - timedBuffer.start(); - - try { - - timedBuffer.setObserver(new TestObserver()); - - int x = 0; - - byte[] bytes = new byte[10]; - for (int j = 0; j < 10; j++) { - bytes[j] = ActiveMQTestBase.getSamplebyte(x++); - } - - ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes); - - timedBuffer.checkSize(10); - timedBuffer.addBytes(buff, true, dummyCallback); - - // waits all the sleeps to be done - sleptLatch.await(10, TimeUnit.SECONDS); - - // keeps waiting a bit longer - Thread.sleep(100); - - assertTrue(timedBuffer.isUseSleep()); - } finally { - timedBuffer.stop(); - } - } }
