This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new f51c799 ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers new 5bee113 This closes #2844 f51c799 is described below commit f51c799ac036f948bb59cd084bdd6f4f5fd51e27 Author: Francesco Nigro <nigro....@gmail.com> AuthorDate: Mon Sep 3 13:51:23 2018 +0200 ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers It use RandomAccessFile to allow using heap buffers without additional copies and/or leaks of direct buffers, as performed by FileChannel JDK implementation (see https://bugs.openjdk.java.net/browse/JDK-8147468) --- .../artemis/core/io/nio/NIOSequentialFile.java | 93 ++++++++++++-- .../NIONonBufferedSequentialFileFactoryTest.java | 133 +++++++++++++++++++++ 2 files changed, 217 insertions(+), 9 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 4202a21..e5857ba 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.io.nio; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; -import java.nio.file.StandardOpenOption; import java.util.List; import java.util.concurrent.Executor; @@ -42,8 +42,20 @@ import org.apache.activemq.artemis.utils.Env; public class NIOSequentialFile extends AbstractSequentialFile { + /* This value has been tuned just to reduce the memory footprint + of read/write of the whole file size: given that this value + is > 8192, RandomAccessFile JNI code will use malloc/free instead + of using a copy on the stack, but it has been proven to NOT be + a bottleneck. + + Instead of reading the whole content in a single operation, this will read in smaller chunks. + */ + private static final int CHUNK_SIZE = 2 * 1024 * 1024; + private FileChannel channel; + private RandomAccessFile rfile; + private final int maxIO; public NIOSequentialFile(final SequentialFileFactory factory, @@ -82,7 +94,9 @@ public class NIOSequentialFile extends AbstractSequentialFile { @Override public void open(final int maxIO, final boolean useExecutor) throws IOException { try { - channel = FileChannel.open(getFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ); + rfile = new RandomAccessFile(getFile(), "rw"); + + channel = rfile.getChannel(); fileSize = channel.size(); } catch (ClosedChannelException e) { @@ -139,18 +153,27 @@ public class NIOSequentialFile extends AbstractSequentialFile { super.close(); try { - if (channel != null) { - if (waitSync && factory.isDatasync()) - channel.force(false); - channel.close(); + try { + if (channel != null) { + if (waitSync && factory.isDatasync()) + channel.force(false); + channel.close(); + } + } finally { + if (rfile != null) { + rfile.close(); + } } } catch (ClosedChannelException e) { throw e; } catch (IOException e) { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; + } finally { + channel = null; + rfile = null; } - channel = null; + notifyAll(); } @@ -160,6 +183,37 @@ public class NIOSequentialFile extends AbstractSequentialFile { return read(bytes, null); } + + private static int readRafInChunks(RandomAccessFile raf, byte[] b, int off, int len) throws IOException { + int remaining = len; + int offset = off; + while (remaining > 0) { + final int chunkSize = Math.min(CHUNK_SIZE, remaining); + final int read = raf.read(b, offset, chunkSize); + assert read != 0; + if (read == -1) { + if (len == remaining) { + return -1; + } + break; + } + offset += read; + remaining -= read; + } + return len - remaining; + } + + private static void writeRafInChunks(RandomAccessFile raf, byte[] b, int off, int len) throws IOException { + int remaining = len; + int offset = off; + while (remaining > 0) { + final int chunkSize = Math.min(CHUNK_SIZE, remaining); + raf.write(b, offset, chunkSize); + offset += chunkSize; + remaining -= chunkSize; + } + } + @Override public synchronized int read(final ByteBuffer bytes, final IOCallback callback) throws IOException, ActiveMQIllegalStateException { @@ -167,7 +221,19 @@ public class NIOSequentialFile extends AbstractSequentialFile { if (channel == null) { throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel"); } - int bytesRead = channel.read(bytes); + final int bytesRead; + if (bytes.hasArray()) { + if (bytes.remaining() > CHUNK_SIZE) { + bytesRead = readRafInChunks(rfile, bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()); + } else { + bytesRead = rfile.read(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()); + } + if (bytesRead > 0) { + bytes.position(bytes.position() + bytesRead); + } + } else { + bytesRead = channel.read(bytes); + } if (callback != null) { callback.done(); @@ -310,7 +376,16 @@ public class NIOSequentialFile extends AbstractSequentialFile { final IOCallback callback, boolean releaseBuffer) throws IOException { try { - channel.write(bytes); + if (bytes.hasArray()) { + if (bytes.remaining() > CHUNK_SIZE) { + writeRafInChunks(rfile, bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()); + } else { + rfile.write(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()); + } + bytes.position(bytes.limit()); + } else { + channel.write(bytes); + } if (sync) { sync(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java index f793a5c..a6f1e09 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java @@ -17,10 +17,15 @@ package org.apache.activemq.artemis.tests.integration.journal; import java.io.File; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase; +import org.junit.Assert; +import org.junit.Test; public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFactoryTestBase { @@ -29,4 +34,132 @@ public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFacto return new NIOSequentialFileFactory(new File(folder), false, 1); } + @Test + public void writeHeapBufferNotFromBeginningAndReadWithDirectBuffer() throws Exception { + writeHeapBufferNotFromBeginningAndRead(false); + } + + @Test + public void writeHeapBufferNotFromBeginningAndReadWithHeapBuffer() throws Exception { + writeHeapBufferNotFromBeginningAndRead(true); + } + + private void writeHeapBufferNotFromBeginningAndRead(boolean useHeapByteBufferToRead) throws Exception { + final SequentialFile file = factory.createSequentialFile("write.amq"); + file.open(); + Assert.assertEquals(0, file.size()); + Assert.assertEquals(0, file.position()); + try { + final String data = "writeDirectArray"; + final byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + file.position(factory.calculateBlockSize(bytes.length)); + file.writeDirect(ByteBuffer.wrap(bytes), false); + final ByteBuffer readBuffer; + if (!useHeapByteBufferToRead) { + readBuffer = factory.newBuffer(bytes.length); + } else { + readBuffer = ByteBuffer.allocate(bytes.length); + } + try { + file.position(factory.calculateBlockSize(bytes.length)); + Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer)); + Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString()); + } finally { + if (!useHeapByteBufferToRead) { + factory.releaseBuffer(readBuffer); + } + } + } finally { + file.close(); + file.delete(); + } + } + + @Test + public void writeHeapBufferAndReadWithDirectBuffer() throws Exception { + writeHeapBufferAndRead(false); + } + + @Test + public void writeHeapBufferAndReadWithHeapBuffer() throws Exception { + writeHeapBufferAndRead(true); + } + + private void writeHeapBufferAndRead(boolean useHeapByteBufferToRead) throws Exception { + final SequentialFile file = factory.createSequentialFile("write.amq"); + file.open(); + Assert.assertEquals(0, file.size()); + Assert.assertEquals(0, file.position()); + try { + final String data = "writeDirectArray"; + final byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + file.writeDirect(ByteBuffer.wrap(bytes), false); + final ByteBuffer readBuffer; + if (!useHeapByteBufferToRead) { + readBuffer = factory.newBuffer(bytes.length); + } else { + readBuffer = ByteBuffer.allocate(bytes.length); + } + try { + file.position(0); + Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer)); + Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString()); + } finally { + if (!useHeapByteBufferToRead) { + factory.releaseBuffer(readBuffer); + } + } + } finally { + file.close(); + file.delete(); + } + } + + @Test + public void writeHeapAndDirectBufferAndReadWithDirectBuffer() throws Exception { + writeHeapAndDirectBufferAndRead(false); + } + + @Test + public void writeHeapAndDirectBufferAndReadWithHeapBuffer() throws Exception { + writeHeapAndDirectBufferAndRead(true); + } + + private void writeHeapAndDirectBufferAndRead(boolean useHeapByteBufferToRead) throws Exception { + final SequentialFile file = factory.createSequentialFile("write.amq"); + file.open(); + Assert.assertEquals(0, file.size()); + Assert.assertEquals(0, file.position()); + try { + final String data = "writeDirectArray"; + final byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + file.writeDirect(ByteBuffer.wrap(bytes), false); + final ByteBuffer byteBuffer = factory.newBuffer(bytes.length); + byteBuffer.put(bytes); + byteBuffer.flip(); + file.writeDirect(byteBuffer, false); + final ByteBuffer readBuffer; + if (!useHeapByteBufferToRead) { + readBuffer = factory.newBuffer(bytes.length); + } else { + readBuffer = ByteBuffer.allocate(bytes.length); + } + try { + file.position(0); + Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer)); + Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString()); + readBuffer.flip(); + Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer)); + Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString()); + } finally { + if (!useHeapByteBufferToRead) { + factory.releaseBuffer(readBuffer); + } + } + } finally { + file.close(); + file.delete(); + } + } + }