IGNITE-5741 - Replaced HeapByteBuffer with DirectByteBuffer in WAL records iterator - Fixes #2329.
Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c23a2dcf Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c23a2dcf Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c23a2dcf Branch: refs/heads/ignite-5947 Commit: c23a2dcfb1395e87cb4e14457a053c6b4727b318 Parents: 13f38d7 Author: Dmitriy Govorukhin <[email protected]> Authored: Mon Aug 14 16:33:12 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Aug 14 16:41:34 2017 +0300 ---------------------------------------------------------------------- .../wal/AbstractWalRecordsIterator.java | 11 ++++- .../persistence/wal/ByteBufferExpander.java | 22 ++++++--- .../wal/FileWriteAheadLogManager.java | 11 ++--- .../reader/StandaloneWalRecordsIterator.java | 9 ++-- .../apache/ignite/internal/util/GridUnsafe.java | 14 ++++++ .../db/wal/crc/IgniteDataIntegrityTests.java | 52 +++++++++++++++++++- 6 files changed, 100 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c23a2dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index beed90b..db949c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -95,7 +95,6 @@ public abstract class AbstractWalRecordsIterator this.serializer = serializer; this.ioFactory = ioFactory; - // Do not allocate direct buffer for iterator. buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder()); } @@ -128,6 +127,16 @@ public abstract class AbstractWalRecordsIterator return curRec != null; } + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + try { + buf.close(); + } + catch (Exception ex) { + throw new IgniteCheckedException(ex); + } + } + /** * Switches records iterator to the next record. * <ul> http://git-wip-us.apache.org/repos/asf/ignite/blob/c23a2dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java index 829cd5c..cf1db84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java @@ -19,19 +19,24 @@ package org.apache.ignite.internal.processors.cache.persistence.wal; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.ignite.internal.util.GridUnsafe; /** * ByteBuffer wrapper for dynamically expand buffer size. */ -public class ByteBufferExpander { +public class ByteBufferExpander implements AutoCloseable { /** Byte buffer */ private ByteBuffer buf; + /** + * @param initSize Initial size. + * @param order Byte order. + */ public ByteBufferExpander(int initSize, ByteOrder order) { - ByteBuffer buffer = ByteBuffer.allocate(initSize); + ByteBuffer buffer = GridUnsafe.allocateBuffer(initSize); buffer.order(order); - this.buf = buffer; + buf = buffer; } /** @@ -49,16 +54,17 @@ public class ByteBufferExpander { * @return ByteBuffer with requested size. */ public ByteBuffer expand(int size) { - ByteBuffer newBuf = ByteBuffer.allocate(size); + ByteBuffer newBuf = GridUnsafe.reallocateBuffer(buf, size); newBuf.order(buf.order()); - newBuf.put(buf); - - newBuf.flip(); - buf = newBuf; return newBuf; } + + /** {@inheritDoc} */ + @Override public void close() { + GridUnsafe.freeBuffer(buf); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c23a2dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 17db8f8..bb1f910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -1430,12 +1430,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private int readSerializerVersion(FileIO io, File file, long idx) throws IOException, IgniteCheckedException { - try { - ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE); - buf.order(ByteOrder.nativeOrder()); - - FileInput in = new FileInput(io, - new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())); + try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())){ + FileInput in = new FileInput(io, buf); // Header record must be agnostic to the serializer version. WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0)); @@ -2402,9 +2398,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override protected void onClose() throws IgniteCheckedException { + super.onClose(); + curRec = null; final ReadFileHandle handle = closeCurrentWalSegment(); + if (handle != null && handle.workDir) releaseWorkSegment(curWalSegmIdx); http://git-wip-us.apache.org/repos/asf/ignite/blob/c23a2dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 85022ad..cd0f8ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -180,9 +180,11 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { FileWALPointer ptr; - try (FileIO fileIO = ioFactory.create(file, "r")) { - final DataInput in = new FileInput(fileIO, - new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())); + try ( + FileIO fileIO = ioFactory.create(file, "r"); + ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()) + ) { + final DataInput in = new FileInput(fileIO, buf); // Header record must be agnostic to the serializer version. final int type = in.readUnsignedByte(); @@ -256,6 +258,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { /** {@inheritDoc} */ @Override protected void onClose() throws IgniteCheckedException { super.onClose(); + curRec = null; closeCurrentWalSegment(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c23a2dcf/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java index 0add64d..15e6f2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java @@ -139,6 +139,20 @@ public abstract class GridUnsafe { } /** + * + * @param buf Buffer. + * @param len New length. + * @return Reallocated direct buffer. + */ + public static ByteBuffer reallocateBuffer(ByteBuffer buf, int len) { + long ptr = bufferAddress(buf); + + long newPtr = reallocateMemory(ptr, len); + + return wrapPointer(newPtr, len); + } + + /** * Gets boolean value from object field. * * @param obj Object. http://git-wip-us.apache.org/repos/asf/ignite/blob/c23a2dcf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java index b93c74d..270c560 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java @@ -41,6 +41,9 @@ public class IgniteDataIntegrityTests extends TestCase { /** Random access file. */ private RandomAccessFile randomAccessFile; + /** Buffer expander. */ + private ByteBufferExpander expBuf; + /** {@inheritDoc} */ @Override protected void setUp() throws Exception { super.setUp(); @@ -50,9 +53,11 @@ public class IgniteDataIntegrityTests extends TestCase { randomAccessFile = new RandomAccessFile(file, "rw"); + expBuf = new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN); + fileInput = new FileInput( new RandomAccessFileIO(randomAccessFile), - new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN) + expBuf ); ByteBuffer buf = ByteBuffer.allocate(1024); @@ -70,6 +75,12 @@ public class IgniteDataIntegrityTests extends TestCase { randomAccessFile.getFD().sync(); } + /** {@inheritDoc} */ + @Override protected void tearDown() throws Exception { + randomAccessFile.close(); + expBuf.close(); + } + /** * */ @@ -108,6 +119,45 @@ public class IgniteDataIntegrityTests extends TestCase { } /** + * + */ + public void testExpandBuffer() { + ByteBufferExpander expBuf = new ByteBufferExpander(16, ByteOrder.nativeOrder()); + + ByteBuffer b1 = expBuf.buffer(); + + b1.put((byte)1); + b1.putInt(2); + b1.putLong(3L); + + assertEquals(13, b1.position()); + assertEquals(16, b1.limit()); + + ByteBuffer b2 = expBuf.expand(32); + + assertEquals(0, b2.position()); + assertEquals((byte)1, b2.get()); + assertEquals(2, b2.getInt()); + assertEquals(3L, b2.getLong()); + assertEquals(13, b2.position()); + assertEquals(32, b2.limit()); + + b2.putInt(4); + + assertEquals(17, b2.position()); + assertEquals(32, b2.limit()); + + b2.flip(); + + assertEquals(0, b2.position()); + assertEquals((byte)1, b2.get()); + assertEquals(2, b2.getInt()); + assertEquals(3L, b2.getLong()); + assertEquals(4, b2.getInt()); + assertEquals(17, b2.limit()); + } + + /** * @param rangeFrom Range from. * @param rangeTo Range to. */
