This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 89c02f1 ARTEMIS-2211 Refactor ByteBuffer pooling, alignment and
zeroing
new 896142b This closes #2479
89c02f1 is described below
commit 89c02f1cc013181b688ae3be0d9639bd25544dd7
Author: Francesco Nigro <[email protected]>
AuthorDate: Thu Dec 20 11:11:36 2018 +0100
ARTEMIS-2211 Refactor ByteBuffer pooling, alignment and zeroing
Refactored thread local ByteBuffer pooling, alignment
and zeroing in order to avoid duplicate code and
improve code coverage with tests.
In addition are being provided faster branchless
alignment operations and optional zeroing of
pooled ByteBuffers for both ASYNCIO and
NIO/MAPPED journal types.
---
.../apache/activemq/artemis/utils/ByteUtil.java | 50 +++++
.../activemq/artemis/utils/PowerOf2Util.java | 38 ++--
.../activemq/artemis/utils/ByteUtilTest.java | 218 +++++++++++++++++++++
.../activemq/artemis/utils/PowerOf2UtilTest.java | 44 +++++
.../artemis/core/io/aio/AIOSequentialFile.java | 6 +-
.../core/io/aio/AIOSequentialFileFactory.java | 73 ++++---
.../artemis/core/io/mapped/MappedFile.java | 3 +-
.../io/mapped/MappedSequentialFileFactory.java | 55 +-----
.../core/io/nio/NIOSequentialFileFactory.java | 44 +----
.../artemis/core/io/util/ByteBufferPool.java | 51 +++++
.../core/io/util/ThreadLocalByteBufferPool.java | 80 ++++++++
.../io/util/ThreadLocalByteBufferPoolTest.java | 174 ++++++++++++++++
12 files changed, 693 insertions(+), 143 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index bf0ac87..88c6d49 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -17,6 +17,8 @@
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -342,4 +344,52 @@ public class ByteUtil {
public static int intFromBytes(byte b1, byte b2, byte b3, byte b4) {
return b1 << 24 | (b2 & 0xFF) << 16 | (b3 & 0xFF) << 8 | (b4 & 0xFF);
}
+
+ /**
+ * It zeroes the whole {@link ByteBuffer#capacity()} of the given {@code
buffer}.
+ *
+ * @throws ReadOnlyBufferException if {@code buffer} is read-only
+ */
+ public static void zeros(final ByteBuffer buffer) {
+ uncheckedZeros(buffer, 0, buffer.capacity());
+ }
+
+ /**
+ * It zeroes {@code bytes} of the given {@code buffer}, starting
(inclusive) from {@code offset}.
+ *
+ * @throws IndexOutOfBoundsException if {@code offset + bytes > }{@link
ByteBuffer#capacity()} or {@code offset >= }{@link ByteBuffer#capacity()}
+ * @throws IllegalArgumentException if {@code offset} or {@code capacity}
are less then 0
+ * @throws ReadOnlyBufferException if {@code buffer} is read-only
+ */
+ public static void zeros(final ByteBuffer buffer, int offset, int bytes) {
+ if (offset < 0 || bytes < 0) {
+ throw new IllegalArgumentException();
+ }
+ final int capacity = buffer.capacity();
+ if (offset >= capacity || (offset + bytes) > capacity) {
+ throw new IndexOutOfBoundsException();
+ }
+ uncheckedZeros(buffer, offset, bytes);
+ }
+
+ private static void uncheckedZeros(final ByteBuffer buffer, int offset, int
bytes) {
+ if (buffer.isReadOnly()) {
+ throw new ReadOnlyBufferException();
+ }
+ final byte zero = (byte) 0;
+ if (buffer.isDirect() && PlatformDependent.hasUnsafe()) {
+
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer) +
offset, bytes, zero);
+ } else if (buffer.hasArray()) {
+ //SIMD OPTIMIZATION
+ final int arrayOffset = buffer.arrayOffset();
+ final int start = arrayOffset + offset;
+ Arrays.fill(buffer.array(), start, start + bytes, zero);
+ } else {
+ //slow path
+ for (int i = 0; i < bytes; i++) {
+ buffer.put(i + offset, zero);
+ }
+ }
+ }
+
}
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java
similarity index 66%
rename from
artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
rename to
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java
index 986b698..fd60402 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java
@@ -14,19 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.io.mapped;
+package org.apache.activemq.artemis.utils;
-import java.nio.ByteBuffer;
-
-import io.netty.util.internal.PlatformDependent;
-
-final class BytesUtils {
+/**
+ * Collection of bit-tricks for power of 2 cases.
+ */
+public final class PowerOf2Util {
- private BytesUtils() {
+ private PowerOf2Util() {
}
- public static long align(final long value, final long alignment) {
- return (value + (alignment - 1)) & ~(alignment - 1);
+ /**
+ * Fast alignment operation with power of 2 {@code alignment} and {@code
value >=0} and {@code value <}{@link Integer#MAX_VALUE}.<br>
+ * In order to be fast is up to the caller to check arguments correctness.
+ * Original algorithm is on
https://en.wikipedia.org/wiki/Data_structure_alignment.
+ */
+ public static int align(final int value, final int pow2alignment) {
+ return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
}
/**
@@ -54,20 +58,4 @@ final class BytesUtils {
return (value & (pow2alignment - 1)) == 0;
}
- public static void zerosDirect(final ByteBuffer buffer) {
- //DANGEROUS!! erases bound-checking using directly addresses -> safe
only if it use counted loops
- int remaining = buffer.capacity();
- long address = PlatformDependent.directBufferAddress(buffer);
- while (remaining >= 8) {
- PlatformDependent.putLong(address, 0L);
- address += 8;
- remaining -= 8;
- }
- while (remaining > 0) {
- PlatformDependent.putByte(address, (byte) 0);
- address++;
- remaining--;
- }
- }
-
}
diff --git
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
index 80aa7e5..000ae56 100644
---
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
+++
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
@@ -17,6 +17,11 @@
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
+import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+import java.util.Arrays;
+
+import io.netty.util.internal.PlatformDependent;
import org.junit.Assert;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -87,6 +92,219 @@ public class ByteUtilTest {
}
}
+ private static byte[] duplicateRemaining(ByteBuffer buffer, int offset, int
bytes) {
+ final int end = offset + bytes;
+ final int expectedRemaining = buffer.capacity() - end;
+ //it is handling the case of <0 just to allow from to > capacity
+ if (expectedRemaining <= 0) {
+ return null;
+ }
+ final byte[] remaining = new byte[expectedRemaining];
+ final ByteBuffer duplicate = buffer.duplicate();
+ duplicate.clear().position(end);
+ duplicate.get(remaining, 0, expectedRemaining);
+ return remaining;
+ }
+
+ private static byte[] duplicateBefore(ByteBuffer buffer, int offset) {
+ if (offset <= 0) {
+ return null;
+ }
+ final int size = Math.min(buffer.capacity(), offset);
+ final byte[] remaining = new byte[size];
+ final ByteBuffer duplicate = buffer.duplicate();
+ duplicate.clear();
+ duplicate.get(remaining, 0, size);
+ return remaining;
+ }
+
+ private static void shouldZeroesByteBuffer(ByteBuffer buffer, int offset,
int bytes) {
+ final byte[] originalBefore = duplicateBefore(buffer, offset);
+ final byte[] originalRemaining = duplicateRemaining(buffer, offset,
bytes);
+ final int position = buffer.position();
+ final int limit = buffer.limit();
+ ByteUtil.zeros(buffer, offset, bytes);
+ Assert.assertEquals(position, buffer.position());
+ Assert.assertEquals(limit, buffer.limit());
+ final byte[] zeros = new byte[bytes];
+ final byte[] content = new byte[bytes];
+ final ByteBuffer duplicate = buffer.duplicate();
+ duplicate.clear().position(offset);
+ duplicate.get(content, 0, bytes);
+ Assert.assertArrayEquals(zeros, content);
+ if (originalRemaining != null) {
+ final byte[] remaining = new byte[duplicate.remaining()];
+ //duplicate position has been moved of bytes
+ duplicate.get(remaining);
+ Assert.assertArrayEquals(originalRemaining, remaining);
+ }
+ if (originalBefore != null) {
+ final byte[] before = new byte[offset];
+ //duplicate position has been moved of bytes: need to reset it
+ duplicate.position(0);
+ duplicate.get(before);
+ Assert.assertArrayEquals(originalBefore, before);
+ }
+ }
+
+ private ByteBuffer fill(ByteBuffer buffer, int offset, int length, byte
value) {
+ for (int i = 0; i < length; i++) {
+ buffer.put(offset + i, value);
+ }
+ return buffer;
+ }
+
+ @Test
+ public void shouldZeroesDirectByteBuffer() {
+ final byte one = (byte) 1;
+ final int capacity = 64;
+ final int bytes = 32;
+ final int offset = 1;
+ final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity);
+ try {
+ fill(buffer, 0, capacity, one);
+ shouldZeroesByteBuffer(buffer, offset, bytes);
+ } finally {
+ if (PlatformDependent.hasUnsafe()) {
+ PlatformDependent.freeDirectBuffer(buffer);
+ }
+ }
+ }
+
+ @Test
+ public void shouldZeroesLimitedDirectByteBuffer() {
+ final byte one = (byte) 1;
+ final int capacity = 64;
+ final int bytes = 32;
+ final int offset = 1;
+ final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity);
+ try {
+ fill(buffer, 0, capacity, one);
+ buffer.limit(0);
+ shouldZeroesByteBuffer(buffer, offset, bytes);
+ } finally {
+ if (PlatformDependent.hasUnsafe()) {
+ PlatformDependent.freeDirectBuffer(buffer);
+ }
+ }
+ }
+
+ @Test
+ public void shouldZeroesHeapByteBuffer() {
+ final byte one = (byte) 1;
+ final int capacity = 64;
+ final int bytes = 32;
+ final int offset = 1;
+ final ByteBuffer buffer = ByteBuffer.allocate(capacity);
+ fill(buffer, 0, capacity, one);
+ shouldZeroesByteBuffer(buffer, offset, bytes);
+ }
+
+ @Test
+ public void shouldZeroesLimitedHeapByteBuffer() {
+ final byte one = (byte) 1;
+ final int capacity = 64;
+ final int bytes = 32;
+ final int offset = 1;
+ final ByteBuffer buffer = ByteBuffer.allocate(capacity);
+ fill(buffer, 0, capacity, one);
+ buffer.limit(0);
+ shouldZeroesByteBuffer(buffer, offset, bytes);
+ }
+
+ @Test(expected = ReadOnlyBufferException.class)
+ public void shouldFailWithReadOnlyHeapByteBuffer() {
+ final byte one = (byte) 1;
+ final int capacity = 64;
+ final int bytes = 32;
+ final int offset = 1;
+ ByteBuffer buffer = ByteBuffer.allocate(capacity);
+ fill(buffer, 0, capacity, one);
+ buffer = buffer.asReadOnlyBuffer();
+ shouldZeroesByteBuffer(buffer, offset, bytes);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void shouldFailIfOffsetIsGreaterOrEqualHeapByteBufferCapacity() {
+ final byte one = (byte) 1;
+ final int capacity = 64;
+ final int bytes = 0;
+ final int offset = 64;
+ ByteBuffer buffer = ByteBuffer.allocate(capacity);
+ fill(buffer, 0, capacity, one);
+ try {
+ shouldZeroesByteBuffer(buffer, offset, bytes);
+ } catch (IndexOutOfBoundsException expectedEx) {
+ //verify that the buffer hasn't changed
+ final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
+ final byte[] expectedContent = new byte[capacity];
+ Arrays.fill(expectedContent, one);
+ Assert.assertArrayEquals(expectedContent, originalContent);
+ throw expectedEx;
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldFailIfOffsetIsNegative() {
+ final byte one = (byte) 1;
+ final int capacity = 64;
+ final int bytes = 1;
+ final int offset = -1;
+ ByteBuffer buffer = ByteBuffer.allocate(capacity);
+ fill(buffer, 0, capacity, one);
+ try {
+ shouldZeroesByteBuffer(buffer, offset, bytes);
+ } catch (IndexOutOfBoundsException expectedEx) {
+ //verify that the buffer hasn't changed
+ final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
+ final byte[] expectedContent = new byte[capacity];
+ Arrays.fill(expectedContent, one);
+ Assert.assertArrayEquals(expectedContent, originalContent);
+ throw expectedEx;
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldFailIfBytesIsNegative() {
+ final byte one = (byte) 1;
+ final int capacity = 64;
+ final int bytes = -1;
+ final int offset = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(capacity);
+ fill(buffer, 0, capacity, one);
+ try {
+ shouldZeroesByteBuffer(buffer, offset, bytes);
+ } catch (IndexOutOfBoundsException expectedEx) {
+ //verify that the buffer hasn't changed
+ final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
+ final byte[] expectedContent = new byte[capacity];
+ Arrays.fill(expectedContent, one);
+ Assert.assertArrayEquals(expectedContent, originalContent);
+ throw expectedEx;
+ }
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void shouldFailIfExceedingHeapByteBufferCapacity() {
+ final byte one = (byte) 1;
+ final int capacity = 64;
+ final int bytes = 65;
+ final int offset = 1;
+ ByteBuffer buffer = ByteBuffer.allocate(capacity);
+ fill(buffer, 0, capacity, one);
+ try {
+ shouldZeroesByteBuffer(buffer, offset, bytes);
+ } catch (IndexOutOfBoundsException expectedEx) {
+ //verify that the buffer hasn't changed
+ final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
+ final byte[] expectedContent = new byte[capacity];
+ Arrays.fill(expectedContent, one);
+ Assert.assertArrayEquals(expectedContent, originalContent);
+ throw expectedEx;
+ }
+ }
+
+
@Test
public void testIntToByte() {
diff --git
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java
new file mode 100644
index 0000000..d024f2e
--- /dev/null
+++
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+
+import static org.apache.activemq.artemis.utils.PowerOf2Util.align;
+
+public class PowerOf2UtilTest {
+
+ @Test
+ public void shouldAlignToNextMultipleOfAlignment() {
+ final int alignment = 512;
+ assertThat(align(0, alignment), is(0));
+ assertThat(align(1, alignment), is(alignment));
+ assertThat(align(alignment, alignment), is(alignment));
+ assertThat(align(alignment + 1, alignment), is(alignment * 2));
+
+ final int remainder = Integer.MAX_VALUE % alignment;
+ final int alignedMax = Integer.MAX_VALUE - remainder;
+ assertThat(align(alignedMax, alignment), is(alignedMax));
+ //given that Integer.MAX_VALUE is the max value that can be represented
with int
+ //the aligned value would be > 2^32, but (int)(2^32) = Integer.MIN_VALUE
due to the sign bit
+ assertThat(align(Integer.MAX_VALUE, alignment), is(Integer.MIN_VALUE));
+ }
+
+}
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
index 074bebf..793dc70 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -85,11 +85,7 @@ public class AIOSequentialFile extends
AbstractSequentialFile {
@Override
public int calculateBlockStart(final int position) {
- int alignment = factory.getAlignment();
-
- int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) *
alignment;
-
- return pos;
+ return factory.calculateBlockSize(position);
}
@Override
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index 0935026..cc3c91c 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
@@ -163,13 +164,10 @@ public final class AIOSequentialFileFactory extends
AbstractSequentialFileFactor
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
- int blocks = size / getAlignment();
- if (size % getAlignment() != 0) {
- blocks++;
- }
+ final int alignedSize = calculateBlockSize(size);
// The buffer on AIO has to be a multiple of getAlignment()
- ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks *
getAlignment(), getAlignment());
+ ByteBuffer buffer = LibaioContext.newAlignedBuffer(alignedSize,
getAlignment());
buffer.limit(size);
@@ -183,11 +181,8 @@ public final class AIOSequentialFileFactory extends
AbstractSequentialFileFactor
@Override
public ByteBuffer newBuffer(int size) {
- if (size % getAlignment() != 0) {
- size = (size / getAlignment() + 1) * getAlignment();
- }
-
- return buffersControl.newBuffer(size);
+ final int alignedSize = calculateBlockSize(size);
+ return buffersControl.newBuffer(alignedSize, true);
}
@Override
@@ -199,22 +194,26 @@ public final class AIOSequentialFileFactory extends
AbstractSequentialFileFactor
@Override
public int getAlignment() {
if (alignment < 0) {
+ alignment = calculateAlignment(journalDir);
+ }
+ return alignment;
+ }
- File checkFile = null;
-
- try {
- journalDir.mkdirs();
- checkFile = File.createTempFile("journalCheck", ".tmp",
journalDir);
- checkFile.mkdirs();
- checkFile.createNewFile();
- alignment = LibaioContext.getBlockSize(checkFile);
- } catch (Throwable e) {
- logger.warn(e.getMessage(), e);
- alignment = 512;
- } finally {
- if (checkFile != null) {
- checkFile.delete();
- }
+ private static int calculateAlignment(File journalDir) {
+ File checkFile = null;
+ int alignment;
+ try {
+ journalDir.mkdirs();
+ checkFile = File.createTempFile("journalCheck", ".tmp", journalDir);
+ checkFile.mkdirs();
+ checkFile.createNewFile();
+ alignment = LibaioContext.getBlockSize(checkFile);
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ alignment = 512;
+ } finally {
+ if (checkFile != null) {
+ checkFile.delete();
}
}
return alignment;
@@ -230,11 +229,19 @@ public final class AIOSequentialFileFactory extends
AbstractSequentialFileFactor
@Override
public int calculateBlockSize(final int position) {
- int alignment = getAlignment();
-
- int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) *
alignment;
+ final int alignment = getAlignment();
+ if (!PowerOf2Util.isPowOf2(alignment)) {
+ return align(position, alignment);
+ } else {
+ return PowerOf2Util.align(position, alignment);
+ }
+ }
- return pos;
+ /**
+ * It can be used to align {@code size} if alignment is not a power of 2:
otherwise better to use {@link PowerOf2Util#align(int, int)} instead.
+ */
+ private static int align(int size, int alignment) {
+ return (size / alignment + (size % alignment != 0 ? 1 : 0)) * alignment;
}
/* (non-Javadoc)
@@ -442,7 +449,7 @@ public final class AIOSequentialFileFactory extends
AbstractSequentialFileFactor
return alignedBufferSize;
}
- public ByteBuffer newBuffer(final int size) {
+ public ByteBuffer newBuffer(final int size, final boolean zeroed) {
// if a new buffer wasn't requested in 10 seconds, we clear the queue
// This is being done this way as we don't need another Timeout Thread
// just to cleanup this
@@ -481,7 +488,11 @@ public final class AIOSequentialFileFactory extends
AbstractSequentialFileFactor
buffer.limit(calculateBlockSize(size));
} else {
- clearBuffer(buffer);
+ if (zeroed) {
+ clearBuffer(buffer);
+ } else {
+ buffer.position(0);
+ }
// set the limit of the buffer to the bufferSize being required
buffer.limit(calculateBlockSize(size));
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
index 05f8f25..63a38df 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
@@ -28,6 +28,7 @@ import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.Env;
final class MappedFile implements AutoCloseable {
@@ -196,7 +197,7 @@ final class MappedFile implements AutoCloseable {
}
//any that will enter has lastZeroed OS page aligned
while (toZeros >= OS_PAGE_SIZE) {
- assert BytesUtils.isAligned(lastZeroed, OS_PAGE_SIZE);/**/
+ assert PowerOf2Util.isAligned(lastZeroed, OS_PAGE_SIZE);/**/
final long startPage = lastZeroed - OS_PAGE_SIZE;
PlatformDependent.setMemory(startPage, OS_PAGE_SIZE, (byte) 0);
lastZeroed = startPage;
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index 2cdaba1..1d7d6ba 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -18,20 +18,21 @@ package org.apache.activemq.artemis.core.io.mapped;
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.util.ByteBufferPool;
+import org.apache.activemq.artemis.utils.PowerOf2Util;
+import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.Env;
public final class MappedSequentialFileFactory extends
AbstractSequentialFileFactory {
private int capacity;
private boolean bufferPooling;
- //pools only the biggest one -> optimized for the common case
- private final ThreadLocal<ByteBuffer> bytesPool;
+ private final ByteBufferPool bytesPool;
public MappedSequentialFileFactory(File directory,
int capacity,
@@ -47,7 +48,7 @@ public final class MappedSequentialFileFactory extends
AbstractSequentialFileFac
this.capacity = capacity;
this.setDatasync(true);
this.bufferPooling = true;
- this.bytesPool = new ThreadLocal<>();
+ this.bytesPool = ByteBufferPool.threadLocal(true);
}
public MappedSequentialFileFactory capacity(int capacity) {
@@ -76,7 +77,7 @@ public final class MappedSequentialFileFactory extends
AbstractSequentialFileFac
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
- final int requiredCapacity = (int) BytesUtils.align(size,
Env.osPageSize());
+ final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize());
final ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(requiredCapacity);
byteBuffer.limit(size);
return byteBuffer;
@@ -98,43 +99,18 @@ public final class MappedSequentialFileFactory extends
AbstractSequentialFileFac
}
@Override
- public ByteBuffer newBuffer(final int size) {
+ public ByteBuffer newBuffer(int size) {
if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
- final int requiredCapacity = (int) BytesUtils.align(size,
Env.osPageSize());
- ByteBuffer byteBuffer = bytesPool.get();
- if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
- //do not free the old one (if any) until the new one will be
released into the pool!
- byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
- } else {
- bytesPool.set(null);
-
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer),
size, (byte) 0);
- byteBuffer.clear();
- }
- byteBuffer.limit(size);
- return byteBuffer;
+ return bytesPool.borrow(size, true);
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
if (this.bufferPooling) {
- if (buffer.isDirect()) {
- final ByteBuffer byteBuffer = bytesPool.get();
- if (byteBuffer != buffer) {
- //replace with the current pooled only if greater or null
- if (byteBuffer == null || buffer.capacity() >
byteBuffer.capacity()) {
- if (byteBuffer != null) {
- //free the smaller one
- PlatformDependent.freeDirectBuffer(byteBuffer);
- }
- bytesPool.set(buffer);
- } else {
- PlatformDependent.freeDirectBuffer(buffer);
- }
- }
- }
+ bytesPool.release(buffer);
}
}
@@ -168,18 +144,7 @@ public final class MappedSequentialFileFactory extends
AbstractSequentialFileFac
@Override
public void clearBuffer(final ByteBuffer buffer) {
- if (buffer.isDirect()) {
- BytesUtils.zerosDirect(buffer);
- } else if (buffer.hasArray()) {
- final byte[] array = buffer.array();
- //SIMD OPTIMIZATION
- Arrays.fill(array, (byte) 0);
- } else {
- final int capacity = buffer.capacity();
- for (int i = 0; i < capacity; i++) {
- buffer.put(i, (byte) 0);
- }
- }
+ ByteUtil.zeros(buffer);
buffer.rewind();
}
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
index c142377..f8f5971 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -26,6 +26,8 @@ import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.util.ByteBufferPool;
+import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
@@ -35,8 +37,7 @@ public class NIOSequentialFileFactory extends
AbstractSequentialFileFactory {
private boolean bufferPooling;
- //pools only the biggest one -> optimized for the common case
- private final ThreadLocal<ByteBuffer> bytesPool;
+ private final ByteBufferPool bytesPool;
public NIOSequentialFileFactory(final File journalDir, final int maxIO) {
this(journalDir, null, maxIO);
@@ -76,7 +77,7 @@ public class NIOSequentialFileFactory extends
AbstractSequentialFileFactory {
final CriticalAnalyzer analyzer) {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates,
listener, analyzer);
this.bufferPooling = true;
- this.bytesPool = new ThreadLocal<>();
+ this.bytesPool = ByteBufferPool.threadLocal(true);
}
public static ByteBuffer allocateDirectByteBuffer(final int size) {
@@ -123,13 +124,9 @@ public class NIOSequentialFileFactory extends
AbstractSequentialFileFactory {
return timedBuffer != null;
}
- private static int align(final int value, final int pow2alignment) {
- return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
- }
-
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
- final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
+ final int requiredCapacity = PowerOf2Util.align(size,
DEFAULT_CAPACITY_ALIGNMENT);
final ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(requiredCapacity);
byteBuffer.limit(size);
return byteBuffer;
@@ -141,43 +138,18 @@ public class NIOSequentialFileFactory extends
AbstractSequentialFileFactory {
}
@Override
- public ByteBuffer newBuffer(final int size) {
+ public ByteBuffer newBuffer(int size) {
if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
- final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
- ByteBuffer byteBuffer = bytesPool.get();
- if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
- //do not free the old one (if any) until the new one will be
released into the pool!
- byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
- } else {
- bytesPool.set(null);
-
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer),
size, (byte) 0);
- byteBuffer.clear();
- }
- byteBuffer.limit(size);
- return byteBuffer;
+ return bytesPool.borrow(size, true);
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
if (this.bufferPooling) {
- if (buffer.isDirect()) {
- final ByteBuffer byteBuffer = bytesPool.get();
- if (byteBuffer != buffer) {
- //replace with the current pooled only if greater or null
- if (byteBuffer == null || buffer.capacity() >
byteBuffer.capacity()) {
- if (byteBuffer != null) {
- //free the smaller one
- PlatformDependent.freeDirectBuffer(byteBuffer);
- }
- bytesPool.set(buffer);
- } else {
- PlatformDependent.freeDirectBuffer(buffer);
- }
- }
- }
+ bytesPool.release(buffer);
}
}
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java
new file mode 100644
index 0000000..bebb514
--- /dev/null
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.util;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Object Pool that allows to borrow and release {@link ByteBuffer}s according
to a specific type (direct/heap).<br>
+ * The suggested usage pattern is:
+ * <pre>{@code
+ * ByteBuffer buffer = pool.borrow(size);
+ * //...using buffer...
+ * pool.release(buffer);
+ * }</pre>
+ */
+public interface ByteBufferPool {
+
+ /**
+ * It returns a {@link ByteBuffer} with {@link
ByteBuffer#capacity()}>={@code size}.<br>
+ * The {@code buffer} is zeroed until {@code size} if {@code zeroed=true},
with {@link ByteBuffer#position()}=0 and {@link ByteBuffer#limit()}={@code
size}.
+ */
+ ByteBuffer borrow(int size, boolean zeroed);
+
+ /**
+ * It pools or free {@code buffer} that cannot be used anymore.<br>
+ * If {@code buffer} is of a type different from the one that the pool can
borrow, it will ignore it.
+ */
+ void release(ByteBuffer buffer);
+
+ /**
+ * Factory method that creates a thread-local pool of capacity 1 of {@link
ByteBuffer}s of the specified type (direct/heap).
+ */
+ static ByteBufferPool threadLocal(boolean direct) {
+ return new ThreadLocalByteBufferPool(direct);
+ }
+
+}
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java
new file mode 100644
index 0000000..eee0466
--- /dev/null
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.util;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.utils.PowerOf2Util;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.Env;
+
+final class ThreadLocalByteBufferPool implements ByteBufferPool {
+
+ private final ThreadLocal<ByteBuffer> bytesPool;
+ private final boolean direct;
+
+ ThreadLocalByteBufferPool(boolean direct) {
+ this.bytesPool = new ThreadLocal<>();
+ this.direct = direct;
+ }
+
+ @Override
+ public ByteBuffer borrow(final int size, boolean zeroed) {
+ final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize());
+ ByteBuffer byteBuffer = bytesPool.get();
+ if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
+ //do not free the old one (if any) until the new one will be released
into the pool!
+ byteBuffer = direct ? ByteBuffer.allocateDirect(requiredCapacity) :
ByteBuffer.allocate(requiredCapacity);
+ } else {
+ bytesPool.set(null);
+ if (zeroed) {
+ ByteUtil.zeros(byteBuffer, 0, size);
+ }
+ byteBuffer.clear();
+ }
+ byteBuffer.limit(size);
+ return byteBuffer;
+ }
+
+ @Override
+ public void release(ByteBuffer buffer) {
+ Objects.requireNonNull(buffer);
+ boolean directBuffer = buffer.isDirect();
+ if (directBuffer == direct && !buffer.isReadOnly()) {
+ final ByteBuffer byteBuffer = bytesPool.get();
+ if (byteBuffer != buffer) {
+ //replace with the current pooled only if greater or null
+ if (byteBuffer == null || buffer.capacity() >
byteBuffer.capacity()) {
+ if (byteBuffer != null) {
+ //free the smaller one
+ if (directBuffer) {
+ PlatformDependent.freeDirectBuffer(byteBuffer);
+ }
+ }
+ bytesPool.set(buffer);
+ } else {
+ if (directBuffer) {
+ PlatformDependent.freeDirectBuffer(buffer);
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git
a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java
b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java
new file mode 100644
index 0000000..190d7de
--- /dev/null
+++
b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.util;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import io.netty.util.internal.PlatformDependent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ThreadLocalByteBufferPoolTest {
+
+ //testing using heap buffers to avoid killing the test suite
+ private static final boolean isDirect = false;
+ private final ByteBufferPool pool = ByteBufferPool.threadLocal(isDirect);
+ private final boolean zeroed;
+
+ public ThreadLocalByteBufferPoolTest(boolean zeroed) {
+ this.zeroed = zeroed;
+ }
+
+ @Parameterized.Parameters(name = "zeroed={0}")
+ public static Collection<Object[]> getParams() {
+ return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
+ }
+
+ private static void assertZeroed(ByteBuffer buffer) {
+ ByteBuffer bb = buffer.slice();
+ final byte[] content = new byte[bb.remaining()];
+ bb.get(content);
+ final byte[] zeroed = new byte[content.length];
+ Arrays.fill(zeroed, (byte) 0);
+ Assert.assertArrayEquals(zeroed, content);
+ }
+
+ @Test
+ public void shouldBorrowOnlyBuffersOfTheCorrectType() {
+ Assert.assertEquals(isDirect, pool.borrow(0, zeroed).isDirect());
+ }
+
+ @Test
+ public void shouldBorrowZeroedBuffer() {
+ final int size = 32;
+ final ByteBuffer buffer = pool.borrow(size, zeroed);
+ Assert.assertEquals(0, buffer.position());
+ Assert.assertEquals(size, buffer.limit());
+ if (zeroed) {
+ assertZeroed(buffer);
+ }
+ }
+
+ @Test
+ public void shouldBorrowTheSameBuffer() {
+ final int size = 32;
+ final ByteBuffer buffer = pool.borrow(size, zeroed);
+ buffer.put(0, (byte) 1);
+ buffer.position(1);
+ buffer.limit(2);
+ pool.release(buffer);
+ final int newSize = size - 1;
+ final ByteBuffer sameBuffer = pool.borrow(newSize, zeroed);
+ Assert.assertSame(buffer, sameBuffer);
+ Assert.assertEquals(0, sameBuffer.position());
+ Assert.assertEquals(newSize, sameBuffer.limit());
+ if (zeroed) {
+ assertZeroed(sameBuffer);
+ }
+ }
+
+ @Test
+ public void shouldBorrowNewBufferIfExceedPooledCapacity() {
+ final int size = 32;
+ final ByteBuffer buffer = pool.borrow(size, zeroed);
+ pool.release(buffer);
+ final int newSize = buffer.capacity() + 1;
+ final ByteBuffer differentBuffer = pool.borrow(newSize, zeroed);
+ Assert.assertNotSame(buffer, differentBuffer);
+ }
+
+ @Test
+ public void shouldPoolTheBiggestBuffer() {
+ final int size = 32;
+ final ByteBuffer small = pool.borrow(size, zeroed);
+ final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed);
+ pool.release(small);
+ big.limit(0);
+ pool.release(big);
+ Assert.assertSame(big, pool.borrow(big.capacity(), zeroed));
+ }
+
+ @Test
+ public void shouldNotPoolTheSmallestBuffer() {
+ final int size = 32;
+ final ByteBuffer small = pool.borrow(size, zeroed);
+ final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed);
+ big.limit(0);
+ pool.release(big);
+ pool.release(small);
+ Assert.assertSame(big, pool.borrow(big.capacity(), zeroed));
+ }
+
+ @Test
+ public void shouldNotPoolBufferOfDifferentType() {
+ final int size = 32;
+ final ByteBuffer buffer = isDirect ? ByteBuffer.allocate(size) :
ByteBuffer.allocateDirect(size);
+ try {
+ pool.release(buffer);
+ Assert.assertNotSame(buffer, pool.borrow(size, zeroed));
+ } catch (Throwable t) {
+ if (PlatformDependent.hasUnsafe()) {
+ if (buffer.isDirect()) {
+ PlatformDependent.freeDirectBuffer(buffer);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void shouldNotPoolReadOnlyBuffer() {
+ final int size = 32;
+ final ByteBuffer borrow = pool.borrow(size, zeroed);
+ final ByteBuffer readOnlyBuffer = borrow.asReadOnlyBuffer();
+ pool.release(readOnlyBuffer);
+ Assert.assertNotSame(readOnlyBuffer, pool.borrow(size, zeroed));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldFailPoolingNullBuffer() {
+ pool.release(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldFailPoolingNullBufferIfNotEmpty() {
+ final int size = 32;
+ pool.release(pool.borrow(size, zeroed));
+ pool.release(null);
+ }
+
+ @Test
+ public void shouldBorrowOnlyThreadLocalBuffers() throws ExecutionException,
InterruptedException {
+ final int size = 32;
+ final ByteBuffer buffer = pool.borrow(size, zeroed);
+ pool.release(buffer);
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ Assert.assertNotSame(buffer, executor.submit(() -> pool.borrow(size,
zeroed)).get());
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+}