IGNITE-5604 - Expand WAL iterator buffer if record size is greater than current buffer size - Fixes #2244.
Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69357c5d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69357c5d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69357c5d Branch: refs/heads/master Commit: 69357c5d8be431aa51fc3add9e345807fe984fee Parents: 905e34d Author: Dmitriy Govorukhin <[email protected]> Authored: Wed Jul 5 19:24:47 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Jul 5 19:24:47 2017 +0300 ---------------------------------------------------------------------- .../rendezvous/RendezvousAffinityFunction.java | 4 - .../wal/AbstractWalRecordsIterator.java | 14 ++- .../persistence/wal/ByteBufferExpander.java | 47 +++++++++ .../cache/persistence/wal/FileInput.java | 20 +++- .../wal/FileWriteAheadLogManager.java | 2 +- .../db/wal/IgniteWalRecoveryTest.java | 100 ++++++++++++++----- 6 files changed, 146 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java index 1bd0587..0fb20ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -17,10 +17,6 @@ package org.apache.ignite.cache.affinity.rendezvous; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index 7dc0a28..f4bace1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -22,7 +22,6 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; import org.apache.ignite.IgniteCheckedException; @@ -41,8 +40,8 @@ import org.jetbrains.annotations.Nullable; * Iterator over WAL segments. This abstract class provides most functionality for reading records in log. * Subclasses are to override segment switching functionality */ -public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> - implements WALIterator { +public abstract class AbstractWalRecordsIterator + extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> implements WALIterator { /** */ private static final long serialVersionUID = 0L; @@ -73,7 +72,7 @@ public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAd @NotNull private final RecordSerializer serializer; /** Utility buffer for reading records */ - private final ByteBuffer buf; + private final ByteBufferExpander buf; /** * @param log Logger @@ -85,15 +84,14 @@ public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAd @NotNull final IgniteLogger log, @NotNull final GridCacheSharedContext sharedCtx, @NotNull final RecordSerializer serializer, - final int bufSize) { + final int bufSize + ) { this.log = log; this.sharedCtx = sharedCtx; this.serializer = serializer; // Do not allocate direct buffer for iterator. - buf = ByteBuffer.allocate(bufSize); - buf.order(ByteOrder.nativeOrder()); - + buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java new file mode 100644 index 0000000..75d3a98 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferExpander.java @@ -0,0 +1,47 @@ +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * ByteBuffer wrapper for dynamically expand buffer size. + */ +public class ByteBufferExpander { + /** Byte buffer */ + private ByteBuffer buf; + + public ByteBufferExpander(int initSize, ByteOrder order) { + ByteBuffer buffer = ByteBuffer.allocate(initSize); + buffer.order(order); + + this.buf = buffer; + } + + /** + * Current byte buffer. + * + * @return Current byteBuffer. + */ + public ByteBuffer buffer() { + return buf; + } + + /** + * Expands current byte buffer to the requested size. + * + * @return ByteBuffer with requested size. + */ + public ByteBuffer expand(int size) { + ByteBuffer newBuf = ByteBuffer.allocate(size); + + newBuf.order(buf.order()); + + newBuf.put(buf); + + newBuf.flip(); + + buf = newBuf; + + return newBuf; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java index e2d7cba..00c7c02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java @@ -42,6 +42,9 @@ public final class FileInput implements ByteBufferBackedDataInput { /** */ private long pos; + /** */ + private ByteBufferExpander expBuf; + /** * @param ch Channel to read from * @param buf Buffer for reading blocks of data into @@ -58,6 +61,16 @@ public final class FileInput implements ByteBufferBackedDataInput { } /** + * @param ch Channel to read from + * @param expBuf ByteBufferWrapper with ability expand buffer dynamically. + */ + public FileInput(FileChannel ch, ByteBufferExpander expBuf) throws IOException { + this(ch, expBuf.buffer()); + + this.expBuf = expBuf; + } + + /** * Clear buffer. */ private void clearBuffer() { @@ -96,8 +109,11 @@ public final class FileInput implements ByteBufferBackedDataInput { if (available >= requested) return; - if (buf.capacity() < requested) - throw new IOException("Requested size is greater than buffer: " + requested); + if (buf.capacity() < requested) { + buf = expBuf.expand(requested); + + assert available == buf.remaining(); + } buf.compact(); http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 8993112..162f43d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -2327,7 +2327,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl super(log, cctx, serializer, - Math.min(16 * tlbSize, psCfg.getWalRecordIteratorBufferSize())); + psCfg.getWalRecordIteratorBufferSize()); this.walWorkDir = walWorkDir; this.walArchiveDir = walArchiveDir; this.psCfg = psCfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/69357c5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java index 6b4907c..843fb5b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java @@ -136,6 +136,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration(); + pCfg.setWalRecordIteratorBufferSize(1024 * 1024); + if (logOnly) pCfg.setWalMode(WALMode.LOG_ONLY); @@ -180,46 +182,79 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception if failed. */ public void testWalBig() throws Exception { - try { - IgniteEx ignite = startGrid(1); + IgniteEx ignite = startGrid(1); - ignite.active(true); + ignite.active(true); - IgniteCache<Object, Object> cache = ignite.cache("partitioned"); + IgniteCache<Object, Object> cache = ignite.cache("partitioned"); - Random rnd = new Random(); + Random rnd = new Random(); - Map<Integer, IndexedObject> map = new HashMap<>(); + Map<Integer, IndexedObject> map = new HashMap<>(); - for (int i = 0; i < 10_000; i++) { - if (i % 1000 == 0) - X.println(" >> " + i); + for (int i = 0; i < 10_000; i++) { + if (i % 1000 == 0) + X.println(" >> " + i); - int k = rnd.nextInt(300_000); - IndexedObject v = new IndexedObject(rnd.nextInt(10_000)); + int k = rnd.nextInt(300_000); + IndexedObject v = new IndexedObject(rnd.nextInt(10_000)); - cache.put(k, v); - map.put(k, v); - } + cache.put(k, v); + map.put(k, v); + } - // Check. - for (Integer k : map.keySet()) - assertEquals(map.get(k), cache.get(k)); + // Check. + for (Integer k : map.keySet()) + assertEquals(map.get(k), cache.get(k)); - stopGrid(1); + stopGrid(1); - ignite = startGrid(1); + ignite = startGrid(1); - ignite.active(true); + ignite.active(true); - cache = ignite.cache("partitioned"); + cache = ignite.cache("partitioned"); - // Check. - for (Integer k : map.keySet()) - assertEquals(map.get(k), cache.get(k)); + // Check. + for (Integer k : map.keySet()) + assertEquals(map.get(k), cache.get(k)); + } + + /** + * @throws Exception if failed. + */ + public void testWalBigObjectNodeCancel() throws Exception { + final int MAX_SIZE_POWER = 21; + + IgniteEx ignite = startGrid(1); + + ignite.active(true); + + IgniteCache<Object, Object> cache = ignite.cache("partitioned"); + + for (int i = 0; i < MAX_SIZE_POWER; ++i) { + int size = 1 << i; + + cache.put("key_" + i, createTestData(size)); } - finally { - stopAllGrids(); + + stopGrid(1, true); + + ignite = startGrid(1); + + ignite.active(true); + + cache = ignite.cache("partitioned"); + + // Check. + for (int i = 0; i < MAX_SIZE_POWER; ++i) { + int size = 1 << i; + + int[] data = createTestData(size); + + int[] val = (int[])cache.get("key_" + i); + + assertTrue("Invalid data. [key=key_" + i + ']', Arrays.equals(data, val)); } } @@ -977,6 +1012,19 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { } /** + * @param size Size of data. + * @return Test data. + */ + private int[] createTestData(int size) { + int[] data = new int[size]; + + for (int d = 0; d < size; ++d) + data[d] = d; + + return data; + } + + /** * */ private static class LoadRunnable implements IgniteRunnable {
