Repository: ignite Updated Branches: refs/heads/ignite-2.7 76dbc5245 -> 82df92362
http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java new file mode 100644 index 0000000..5918b0b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.io; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; +import org.jetbrains.annotations.NotNull; + +/** + * File input, backed by byte buffer file input. + * This class allows to read data by chunks from file and then read primitives + */ +public class SimpleFileInput implements FileInput { + /** + * Buffer for reading blocks of data into. + * <b>Note:</b> biggest block requested from this input can't be longer than buffer capacity + */ + private ByteBuffer buf; + + /** I/O interface for read/write operations with file */ + protected FileIO io; + + /** */ + private long pos; + + /** */ + private ByteBufferExpander expBuf; + + /** + * @param io FileIO to read from. + * @param buf Buffer for reading blocks of data into. + */ + public SimpleFileInput(FileIO io, ByteBufferExpander buf) throws IOException { + assert io != null; + + this.io = io; + this.buf = buf.buffer(); + + expBuf = buf; + + pos = io.position(); + + clearBuffer(); + } + + /** {@inheritDoc} */ + @Override public FileIO io() { + return io; + } + + /** + * Clear buffer. + */ + private void clearBuffer() { + buf.clear(); + buf.limit(0); + + assert buf.remaining() == 0; // Buffer is empty. + } + + /** {@inheritDoc} */ + @Override public void seek(long pos) throws IOException { + if (pos > io.size()) + throw new EOFException(); + + io.position(pos); + + this.pos = pos; + + clearBuffer(); + } + + /** + * @return Underlying buffer. + */ + @Override public ByteBuffer buffer() { + return buf; + } + + + /** {@inheritDoc} */ + @Override public void ensure(int requested) throws IOException { + int available = buf.remaining(); + + if (available >= requested) + return; + + if (buf.capacity() < requested) { + if (expBuf == null) + throw new IOException("Requested size is greater than buffer: " + requested); + + buf = expBuf.expand(requested); + + assert available == buf.remaining(); + } + + buf.compact(); + + do { + int read = io.read(buf); + + if (read == -1) + throw new EOFException("EOF at position [" + io.position() + "] expected to read [" + requested + "] bytes"); + + available += read; + + pos += read; + } + while (available < requested); + + buf.flip(); + } + + /** + * @return Position in the stream. + */ + @Override public long position() { + return pos - buf.remaining(); + } + + /** + * {@inheritDoc} + */ + @Override public void readFully(@NotNull byte[] b) throws IOException { + ensure(b.length); + + buf.get(b); + } + + /** + * {@inheritDoc} + */ + @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException { + ensure(len); + + buf.get(b, off, len); + } + + /** + * {@inheritDoc} + */ + @Override public int skipBytes(int n) throws IOException { + if (buf.remaining() >= n) + buf.position(buf.position() + n); + else + seek(pos + n); + + return n; + } + + /** + * {@inheritDoc} + */ + @Override public boolean readBoolean() throws IOException { + return readByte() == 1; + } + + /** + * {@inheritDoc} + */ + @Override public byte readByte() throws IOException { + ensure(1); + + return buf.get(); + } + + /** + * {@inheritDoc} + */ + @Override public int readUnsignedByte() throws IOException { + return readByte() & 0xFF; + } + + /** + * {@inheritDoc} + */ + @Override public short readShort() throws IOException { + ensure(2); + + return buf.getShort(); + } + + /** + * {@inheritDoc} + */ + @Override public int readUnsignedShort() throws IOException { + return readShort() & 0xFFFF; + } + + /** + * {@inheritDoc} + */ + @Override public char readChar() throws IOException { + ensure(2); + + return buf.getChar(); + } + + /** + * {@inheritDoc} + */ + @Override public int readInt() throws IOException { + ensure(4); + + return buf.getInt(); + } + + /** + * {@inheritDoc} + */ + @Override public long readLong() throws IOException { + ensure(8); + + return buf.getLong(); + } + + /** + * {@inheritDoc} + */ + @Override public float readFloat() throws IOException { + ensure(4); + + return buf.getFloat(); + } + + /** + * {@inheritDoc} + */ + @Override public double readDouble() throws IOException { + ensure(8); + + return buf.getDouble(); + } + + /** + * {@inheritDoc} + */ + @Override public String readLine() throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String readUTF() throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * @param skipCheck If CRC check should be skipped. + * @return autoclosable fileInput, after its closing crc32 will be calculated and compared with saved one + */ + public Crc32CheckingFileInput startRead(boolean skipCheck) { + return new Crc32CheckingFileInput(this, skipCheck); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleSegmentFileInputFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleSegmentFileInputFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleSegmentFileInputFactory.java new file mode 100644 index 0000000..2e5ea75 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleSegmentFileInputFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.io; + +import java.io.IOException; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; + +/** + * Simple implementation of {@link SegmentFileInputFactory}. + */ +public class SimpleSegmentFileInputFactory implements SegmentFileInputFactory { + + /** {@inheritDoc} */ + @Override public FileInput createFileInput(SegmentIO segmentIO, + ByteBufferExpander buf) throws IOException { + return new SimpleFileInput(segmentIO, buf); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index aaff33a..f9388ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -39,13 +39,13 @@ import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; @@ -67,6 +67,9 @@ public class IgniteWalIteratorFactory { /** Logger. */ private final IgniteLogger log; + /** */ + private final SegmentFileInputFactory segmentFileInputFactory = new SimpleSegmentFileInputFactory(); + /** * Creates WAL files iterator factory. * WAL iterator supports automatic converting from CacheObjects and KeyCacheObject into BinaryObjects @@ -319,10 +322,10 @@ public class IgniteWalIteratorFactory { FileDescriptor ds = new FileDescriptor(file); try ( - FileIO fileIO = ds.isCompressed() ? new UnzipFileIO(file) : ioFactory.create(file); + SegmentIO fileIO = ds.toIO(ioFactory); ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()) ) { - final DataInput in = new FileInput(fileIO, buf); + final DataInput in = segmentFileInputFactory.createFileInput(fileIO, buf); // Header record must be agnostic to the serializer version. final int type = in.readUnsignedByte(); http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 7cfb66d..c33a45b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -37,15 +37,16 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator; import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle; import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; @@ -72,6 +73,9 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { /** */ private static final long serialVersionUID = 0L; + + /** Factory to provide I/O interfaces for read primitives with files. */ + private static final SegmentFileInputFactory FILE_INPUT_FACTORY = new SimpleSegmentFileInputFactory(); /** * File descriptors remained to scan. * <code>null</code> value means directory scan mode @@ -118,7 +122,8 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { sharedCtx, new RecordSerializerFactoryImpl(sharedCtx, readTypeFilter), ioFactory, - initialReadBufferSize + initialReadBufferSize, + FILE_INPUT_FACTORY ); this.lowBound = lowBound; @@ -262,13 +267,13 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { ) throws IgniteCheckedException, FileNotFoundException { AbstractFileDescriptor fd = desc; - FileIO fileIO = null; + SegmentIO fileIO = null; SegmentHeader segmentHeader; while (true) { try { - fileIO = fd.isCompressed() ? new UnzipFileIO(fd.file()) : ioFactory.create(fd.file()); + fileIO = fd.toIO(ioFactory); - segmentHeader = readSegmentHeader(fileIO, curWalSegmIdx); + segmentHeader = readSegmentHeader(fileIO, FILE_INPUT_FACTORY); break; } @@ -422,8 +427,8 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { /** {@inheritDoc} */ @Override protected AbstractReadFileHandle createReadFileHandle( - FileIO fileIO, long idx, RecordSerializer ser, FileInput in + SegmentIO fileIO, RecordSerializer ser, FileInput in ) { - return new ReadFileHandle(fileIO, idx, ser, in); + return new ReadFileHandle(fileIO, ser, in, null); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java index c5760ab..a9d793d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; /** * Record serializer. http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index ca484ce..afd770d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -29,13 +29,15 @@ import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; @@ -246,14 +248,14 @@ public class RecordV1Serializer implements RecordSerializer { * NOTE: Method mutates position of {@code io}. * * @param io I/O interface for file. - * @param expectedIdx Expected WAL segment index for readable record. + * @param segmentFileInputFactory File input factory. * @return Instance of {@link SegmentHeader} extracted from the file. * @throws IgniteCheckedException If failed to read serializer version. */ - public static SegmentHeader readSegmentHeader(FileIO io, long expectedIdx) + public static SegmentHeader readSegmentHeader(SegmentIO io, SegmentFileInputFactory segmentFileInputFactory) throws IgniteCheckedException, IOException { try (ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) { - FileInput in = new FileInput(io, buf); + ByteBufferBackedDataInput in = segmentFileInputFactory.createFileInput(io, buf); in.ensure(HEADER_RECORD_SIZE); @@ -270,7 +272,7 @@ public class RecordV1Serializer implements RecordSerializer { // Read file pointer. FileWALPointer ptr = readPosition(in); - if (expectedIdx != ptr.index()) + if (io.getSegmentId() != ptr.index()) throw new SegmentEofException("Reached logical end of the segment by pointer", null); assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr; @@ -364,7 +366,7 @@ public class RecordV1Serializer implements RecordSerializer { ) throws EOFException, IgniteCheckedException { long startPos = -1; - try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) { + try (SimpleFileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) { startPos = in0.position(); WALRecord res = reader.readWithHeaders(in, expPtr); http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java index 68e55e0..e112522 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java index dd10479..54a7d8c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java @@ -65,8 +65,8 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { new DataRegionConfiguration() .setMaxSize(200L * 1024 * 1024) .setPersistenceEnabled(true)) - .setWalMode(WALMode.LOG_ONLY) - .setWalSegmentSize(512 * 1024); + .setWalMode(WALMode.LOG_ONLY) + .setWalSegmentSize(512 * 1024); cfg.setDataStorageConfiguration(memCfg); @@ -124,8 +124,8 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { log.warning("Start loading"); - try (IgniteDataStreamer<Object, Object> st = ig0.dataStreamer("cache1")){ - for (int k = 0; k < entryCnt; k++){ + try (IgniteDataStreamer<Object, Object> st = ig0.dataStreamer("cache1")) { + for (int k = 0; k < entryCnt; k++) { st.addData(k, k); printProgress(k); @@ -158,10 +158,10 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { log.warning("Start loading"); - try (IgniteDataStreamer<Object, Object> st = ig0.dataStreamer("cache1")){ + try (IgniteDataStreamer<Object, Object> st = ig0.dataStreamer("cache1")) { st.allowOverwrite(true); - for (int k = 0; k < entryCnt; k++){ + for (int k = 0; k < entryCnt; k++) { st.addData(k, k); printProgress(k); @@ -193,16 +193,8 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { for (int g = 0; g < initGridCnt; g++) { IgniteEx ig = grid(g); - FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal(); - - Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage"); - - synchronized (reservationStorage) { - Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved"); - - if (reserved.isEmpty()) - return false; - } + if (isReserveListEmpty(ig)) + return false; } return true; @@ -220,16 +212,8 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { for (int g = 0; g < initGridCnt; g++) { IgniteEx ig = grid(g); - FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal(); - - Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage"); - - synchronized (reservationStorage) { - Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved"); - - if (!reserved.isEmpty()) - return false; - } + if (isReserveListEmpty(ig)) + return false; } return true; @@ -240,9 +224,26 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { } /** + * @return {@code true} if reserve list is empty. + */ + private boolean isReserveListEmpty(IgniteEx ig) { + FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal(); + + Object segmentAware = GridTestUtils.getFieldValue(wal, "segmentAware"); + + synchronized (segmentAware) { + Map reserved = GridTestUtils.getFieldValue(GridTestUtils.getFieldValue(segmentAware, "reservationStorage"), "reserved"); + + if (reserved.isEmpty()) + return true; + } + return false; + } + + /** * */ - private void printProgress(int k){ + private void printProgress(int k) { if (k % 1000 == 0) log.warning("Keys -> " + k); } @@ -253,7 +254,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { public void testRemovesArePreloadedIfHistoryIsAvailable() throws Exception { int entryCnt = 10_000; - IgniteEx ig0 = (IgniteEx) startGrids(2); + IgniteEx ig0 = (IgniteEx)startGrids(2); ig0.active(true); @@ -302,7 +303,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { public void testNodeIsClearedIfHistoryIsUnavailable() throws Exception { int entryCnt = 10_000; - IgniteEx ig0 = (IgniteEx) startGrids(2); + IgniteEx ig0 = (IgniteEx)startGrids(2); ig0.active(true); @@ -362,7 +363,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { public void testWalHistoryPartiallyRemoved() throws Exception { int entryCnt = 10_000; - IgniteEx ig0 = (IgniteEx) startGrids(2); + IgniteEx ig0 = (IgniteEx)startGrids(2); ig0.cluster().active(true); @@ -386,7 +387,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { stopAllGrids(); U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), walArchPath + "/" + - nodeId0, false)); + nodeId0, false)); startGrid(0); @@ -436,16 +437,8 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { for (int g = 0; g < initGridCnt; g++) { IgniteEx ig = grid(g); - FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal(); - - Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage"); - - synchronized (reservationStorage) { - Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved"); - - if (reserved.isEmpty()) - return false; - } + if (isReserveListEmpty(ig)) + return false; } return true; @@ -465,16 +458,8 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { for (int g = 0; g < initGridCnt - 1; g++) { IgniteEx ig = grid(g); - FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal(); - - Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage"); - - synchronized (reservationStorage) { - Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved"); - - if (!reserved.isEmpty()) - return false; - } + if (isReserveListEmpty(ig)) + return false; } return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java index 9dbef5d..74db28f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java @@ -18,13 +18,18 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; import java.io.File; +import java.nio.channels.Channel; import java.nio.file.Paths; import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; @@ -40,8 +45,11 @@ import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDataba import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; @@ -58,6 +66,8 @@ import org.junit.Assert; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; +import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /*** * Test check correct switch segment if in the tail of segment have garbage. @@ -159,7 +169,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { * * @throws Exception If some thing failed. */ - public void test() throws Exception { + public void testInvariantSwitchSegment() throws Exception { for (int serVer : checkSerializerVers) { for (Class walMgrClass : checkWalManagers) { try { @@ -175,6 +185,22 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { } /** + * Test for check switch segment from work dir to archive dir during iteration. + * + * @throws Exception If some thing failed. + */ + public void testSwitchReadingSegmentFromWorkToArchive() throws Exception { + for (int serVer : checkSerializerVers) { + try { + checkSwitchReadingSegmentDuringIteration(FileWriteAheadLogManager.class, serVer); + } + finally { + U.delete(Paths.get(U.defaultWorkDirectory())); + } + } + } + + /** * @param walMgrClass WAL manager class. * @param serVer WAL serializer version. * @throws Exception If some thing failed. @@ -299,6 +325,101 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { Assert.assertEquals("Not all records read during iteration.", expectedRecords, actualRecords); } + /** + * @param walMgrClass WAL manager class. + * @param serVer WAL serializer version. + * @throws Exception If some thing failed. + */ + private void checkSwitchReadingSegmentDuringIteration(Class walMgrClass, int serVer) throws Exception { + String workDir = U.defaultWorkDirectory(); + + T2<IgniteWriteAheadLogManager, RecordSerializer> initTup = initiate(walMgrClass, serVer, workDir); + + IgniteWriteAheadLogManager walMgr = initTup.get1(); + + RecordSerializer recordSerializer = initTup.get2(); + + MetastoreDataRecord rec = new MetastoreDataRecord("0", new byte[100]); + + int recSize = recordSerializer.size(rec); + + // Add more record for rollover to the next segment. + int recordsToWrite = SEGMENT_SIZE / recSize + 100; + + SegmentAware segmentAware = GridTestUtils.getFieldValue(walMgr, "segmentAware"); + + //guard from archivation before iterator would be created. + segmentAware.checkCanReadArchiveOrReserveWorkSegment(0); + + for (int i = 0; i < recordsToWrite; i++) + walMgr.log(new MetastoreDataRecord(rec.key(), rec.value())); + + walMgr.flush(null, true); + + int expectedRecords = recordsToWrite; + AtomicInteger actualRecords = new AtomicInteger(0); + + AtomicReference<String> startedSegmentPath = new AtomicReference<>(); + AtomicReference<String> finishedSegmentPath = new AtomicReference<>(); + + CountDownLatch startedIteratorLatch = new CountDownLatch(1); + CountDownLatch finishedArchivedLatch = new CountDownLatch(1); + + IgniteInternalFuture<Object> future = GridTestUtils.runAsync( + () -> { + // Check that switch segment works as expected and all record is reachable. + try (WALIterator it = walMgr.replay(null)) { + Object handle = getFieldValueHierarchy(it, "currWalSegment"); + + FileInput in = getFieldValueHierarchy(handle, "in"); + Object delegate = getFieldValueHierarchy(in.io(), "delegate"); + Channel ch = getFieldValueHierarchy(delegate, "ch"); + String path = getFieldValueHierarchy(ch, "path"); + + startedSegmentPath.set(path); + + startedIteratorLatch.countDown(); + + while (it.hasNext()) { + IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); + + WALRecord rec0 = tup.get2(); + + if (rec0.type() == METASTORE_DATA_RECORD) + actualRecords.incrementAndGet(); + + finishedArchivedLatch.await(); + } + + in = getFieldValueHierarchy(handle, "in"); + delegate = getFieldValueHierarchy(in.io(), "delegate"); + ch = getFieldValueHierarchy(delegate, "ch"); + path = getFieldValueHierarchy(ch, "path"); + + finishedSegmentPath.set(path); + } + + return null; + } + ); + + startedIteratorLatch.await(); + + segmentAware.releaseWorkSegment(0); + + waitForCondition(() -> segmentAware.lastArchivedAbsoluteIndex() == 0, 5000); + + finishedArchivedLatch.countDown(); + + future.get(); + + //should started iteration from work directory but finish from archive directory. + assertEquals(workDir + WORK_SUB_DIR + "/0000000000000000.wal", startedSegmentPath.get()); + assertEquals(workDir + ARCHIVE_SUB_DIR + "/0000000000000000.wal", finishedSegmentPath.get()); + + Assert.assertEquals("Not all records read during iteration.", expectedRecords, actualRecords.get()); + } + /*** * Initiate WAL manager. * @@ -323,6 +444,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { cfg.setDataStorageConfiguration( new DataStorageConfiguration() .setWalSegmentSize(SEGMENT_SIZE) + .setWalRecordIteratorBufferSize(SEGMENT_SIZE / 2) .setWalMode(WALMode.FSYNC) .setWalPath(workDir + WORK_SUB_DIR) .setWalArchivePath(workDir + ARCHIVE_SUB_DIR) http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java index c077b27..59dd3b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java @@ -27,7 +27,8 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; @@ -36,7 +37,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaC */ public class IgniteDataIntegrityTests extends TestCase { /** File input. */ - private FileInput fileInput; + private SimpleFileInput fileInput; /** Buffer expander. */ private ByteBufferExpander expBuf; @@ -52,7 +53,7 @@ public class IgniteDataIntegrityTests extends TestCase { FileIOFactory factory = new RandomAccessFileIOFactory(); - fileInput = new FileInput( + fileInput = new SimpleFileInput( factory.create(file), expBuf ); http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java new file mode 100644 index 0000000..8287684 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import java.util.concurrent.CountDownLatch; +import junit.framework.TestCase; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * Test for {@link SegmentAware}. + */ +public class SegmentAwareTest extends TestCase { + + /** + * Waiting finished when work segment is set. + */ + public void testFinishAwaitSegment_WhenExactWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); + + //when: set exact awaiting segment. + aware.curAbsWalIdx(5); + + //then: waiting should finish immediately + future.get(20); + } + + /** + * Waiting finished when work segment greater than expected is set. + */ + public void testFinishAwaitSegment_WhenGreaterThanWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); + + //when: set grater than awaiting segment. + aware.curAbsWalIdx(10); + + //then: waiting should finish immediately + future.get(20); + } + + /** + * Waiting finished when work segment is set. + */ + public void testFinishAwaitSegment_WhenNextSegmentEqualToWaitingOne() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); + + //when: set less than awaiting segment. + aware.curAbsWalIdx(4); + + //then: thread still waiting the segment + assertFutureIsNotFinish(future); + + //when: trigger next segment. + aware.nextAbsoluteSegmentIndex(); + + //then: waiting should finish immediately + future.get(20); + } + + /** + * Waiting finished when interrupt was triggered. + */ + public void testFinishAwaitSegment_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); + + //when: interrupt waiting. + aware.interrupt(); + + //then: IgniteInterruptedCheckedException should be throw. + assertTrue(future.get(20) instanceof IgniteInterruptedCheckedException); + } + + /** + * Waiting finished when next work segment triggered. + */ + public void testFinishWaitSegmentForArchive_WhenWorkSegmentIncremented() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.curAbsWalIdx(5); + aware.setLastArchivedAbsoluteIndex(4); + + IgniteInternalFuture future = awaitThread(aware::waitNextSegmentForArchivation); + + //when: next work segment triggered. + aware.nextAbsoluteSegmentIndex(); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished when work segment is set. + */ + public void testFinishWaitSegmentForArchive_WhenWorkSegmentGreaterValue() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.curAbsWalIdx(5); + aware.setLastArchivedAbsoluteIndex(4); + + IgniteInternalFuture future = awaitThread(aware::waitNextSegmentForArchivation); + + //when: set work segment greater than required. + aware.curAbsWalIdx(7); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished when interrupt was triggered. + */ + public void testFinishWaitSegmentForArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.curAbsWalIdx(5); + aware.setLastArchivedAbsoluteIndex(4); + + IgniteInternalFuture future = awaitThread(aware::waitNextSegmentForArchivation); + + //when: interrupt waiting. + aware.interrupt(); + + //then: IgniteInterruptedCheckedException should be throw. + assertTrue(future.get(20) instanceof IgniteInterruptedCheckedException); + } + + /** + * Should correct calculate next segment. + */ + public void testCorrectCalculateNextSegmentIndex() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.curAbsWalIdx(5); + + //when: request next work segment. + long segmentIndex = aware.nextAbsoluteSegmentIndex(); + + //then: + assertThat(segmentIndex, is(6L)); + } + + /** + * Waiting finished when segment archived. + */ + public void testFinishWaitNextAbsoluteIndex_WhenMarkAsArchivedFirstSegment() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(2); + + aware.curAbsWalIdx(1); + aware.setLastArchivedAbsoluteIndex(-1); + + IgniteInternalFuture future = awaitThread(aware::nextAbsoluteSegmentIndex); + + //when: mark first segment as moved. + aware.markAsMovedToArchive(0); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished when segment archived. + */ + public void testFinishWaitNextAbsoluteIndex_WhenSetToArchivedFirst() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(2); + + aware.curAbsWalIdx(1); + aware.setLastArchivedAbsoluteIndex(-1); + + IgniteInternalFuture future = awaitThread(aware::nextAbsoluteSegmentIndex); + + //when: mark first segment as moved. + aware.setLastArchivedAbsoluteIndex(0); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished when force interrupt was triggered. + */ + public void testFinishWaitNextAbsoluteIndex_WhenOnlyForceInterruptWasCall() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(2); + + aware.curAbsWalIdx(2); + aware.setLastArchivedAbsoluteIndex(-1); + + IgniteInternalFuture future = awaitThread(aware::nextAbsoluteSegmentIndex); + + //when: interrupt waiting. + aware.interrupt(); + + //then: nothing to happen because nextAbsoluteSegmentIndex is not interrupt by "interrupt" call. + assertFutureIsNotFinish(future); + + //when: force interrupt waiting. + aware.forceInterrupt(); + + //then: IgniteInterruptedCheckedException should be throw. + assertTrue(future.get(20) instanceof IgniteInterruptedCheckedException); + } + + /** + * Waiting finished when segment archived. + */ + public void testFinishSegmentArchived_WhenSetExactWaitingSegment() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); + + //when: archived exact expected segment. + aware.setLastArchivedAbsoluteIndex(5); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished when segment archived. + */ + public void testFinishSegmentArchived_WhenMarkExactWatingSegment() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); + + //when: mark exact segment as moved. + aware.markAsMovedToArchive(5); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished when segment archived. + */ + public void testFinishSegmentArchived_WhenSetGreaterThanWatingSegment() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); + + //when: archived greater than expected segment. + aware.setLastArchivedAbsoluteIndex(7); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished when segment archived. + */ + public void testFinishSegmentArchived_WhenMarkGreaterThanWatingSegment() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); + + //when: moved greater than expected segment. + aware.markAsMovedToArchive(7); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished when interrupt was triggered. + */ + public void testFinishSegmentArchived_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.curAbsWalIdx(5); + aware.setLastArchivedAbsoluteIndex(4); + + IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); + + //when: interrupt waiting. + aware.interrupt(); + + //then: IgniteInterruptedCheckedException should be throw. + assertTrue(future.get(20) instanceof IgniteInterruptedCheckedException); + } + + /** + * Waiting finished when release work segment. + */ + public void testMarkAsMovedToArchive_WhenReleaseLockedSegment() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.checkCanReadArchiveOrReserveWorkSegment(5); + + IgniteInternalFuture future = awaitThread(() -> aware.markAsMovedToArchive(5)); + + //when: release exact expected work segment. + aware.releaseWorkSegment(5); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished and increment archived segment when interrupt was call. + */ + public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + aware.checkCanReadArchiveOrReserveWorkSegment(5); + + IgniteInternalFuture future = awaitThread(() -> aware.markAsMovedToArchive(5)); + + //when: interrupt waiting. + aware.interrupt(); + + //then: IgniteInterruptedCheckedException should be throw. + assertFalse(future.get(20) instanceof IgniteInterruptedCheckedException); + + //and: last archived segment should be changed. + assertEquals(5, aware.lastArchivedAbsoluteIndex()); + } + + /** + * Waiting finished when segment archived. + */ + public void testFinishWaitSegmentToCompress_WhenSetLastArchivedSegment() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.lastCompressedIdx(5); + + IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress); + + //when: archived expected segment. + aware.setLastArchivedAbsoluteIndex(6); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Waiting finished when segment archived. + */ + public void testFinishWaitSegmentToCompress_WhenMarkLastArchivedSegment() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.lastCompressedIdx(5); + + IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress); + + //when: marked expected segment. + aware.markAsMovedToArchive(6); + + //then: waiting should finish immediately. + future.get(20); + } + + /** + * Next segment for compress based on truncated archive idx. + */ + public void testCorrectCalculateNextCompressSegment() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.lastCompressedIdx(5); + aware.setLastArchivedAbsoluteIndex(6); + aware.lastTruncatedArchiveIdx(7); + + //when: + long segmentToCompress = aware.waitNextSegmentToCompress(); + + //then: segment to compress greater than truncated archive idx + assertEquals(8, segmentToCompress); + } + + /** + * Waiting finished when interrupt was call. + */ + public void testFinishWaitSegmentToCompress_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + aware.lastCompressedIdx(5); + + IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress); + + //when: interrupt waiting. + aware.interrupt(); + + //then: IgniteInterruptedCheckedException should be throw. + assertTrue(future.get(20) instanceof IgniteInterruptedCheckedException); + } + + /** + * Segment reserve correctly. + */ + public void testReserveCorrectly() { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + //when: reserve one segment twice and one segment once. + aware.reserve(5); + aware.reserve(5); + aware.reserve(7); + + //then: segments greater than minimum should be reserved. + assertTrue(aware.reserved(5)); + assertTrue(aware.reserved(10)); + assertFalse(aware.reserved(4)); + + //when: release one of twice locked segment. + aware.release(5); + + //then: nothing to change. + assertTrue(aware.reserved(5)); + assertTrue(aware.reserved(10)); + assertFalse(aware.reserved(4)); + + //when: again release one of twice locked segment. + aware.release(5); + + //then: segments greater than second locked segment should be reserved. + assertTrue(aware.reserved(7)); + assertTrue(aware.reserved(10)); + assertFalse(aware.reserved(5)); + assertFalse(aware.reserved(6)); + + //when: release last segment. + aware.release(7); + + //then: all segments should be released. + assertFalse(aware.reserved(7)); + assertFalse(aware.reserved(10)); + assertFalse(aware.reserved(6)); + } + + /** + * Should fail when release unreserved segment. + */ + public void testAssertFail_WhenReleaseUnreservedSegment() { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.reserve(5); + try { + + aware.release(7); + } + catch (AssertionError e) { + return; + } + + fail("Should fail with AssertError because this segment have not reserved"); + } + + /** + * Segment locked correctly. + */ + public void testReserveWorkSegmentCorrectly() { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + //when: lock one segment twice. + aware.checkCanReadArchiveOrReserveWorkSegment(5); + aware.checkCanReadArchiveOrReserveWorkSegment(5); + + //then: exact one segment should locked. + assertTrue(aware.locked(5)); + assertFalse(aware.locked(6)); + assertFalse(aware.locked(4)); + + //when: release segment once. + aware.releaseWorkSegment(5); + + //then: nothing to change, segment still locked. + assertTrue(aware.locked(5)); + assertFalse(aware.locked(6)); + assertFalse(aware.locked(4)); + + //when: release segment. + aware.releaseWorkSegment(5); + + //then: all segments should be unlocked. + assertFalse(aware.locked(5)); + assertFalse(aware.locked(6)); + assertFalse(aware.locked(4)); + } + + /** + * Should fail when release unlocked segment. + */ + public void testAssertFail_WhenReleaseUnreservedWorkSegment() { + //given: thread which awaited segment. + SegmentAware aware = new SegmentAware(10); + + aware.checkCanReadArchiveOrReserveWorkSegment(5); + try { + + aware.releaseWorkSegment(7); + } + catch (AssertionError e) { + return; + } + + fail("Should fail with AssertError because this segment have not reserved"); + } + + /** + * Assert that future is still not finished. + * + * @param future Future to check. + */ + private void assertFutureIsNotFinish(IgniteInternalFuture future) throws IgniteCheckedException { + try { + future.get(20); + + fail("Timeout should be appear because thread should be still work"); + } + catch (IgniteFutureTimeoutCheckedException ignore) { + + } + } + + /** + * Create thread for execute waiter. + * + * @param waiter Waiter for execute in new thread. + * @return Future of thread. + */ + private IgniteInternalFuture awaitThread(Waiter waiter) throws IgniteCheckedException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + IgniteInternalFuture<Object> future = GridTestUtils.runAsync( + () -> { + latch.countDown(); + try { + waiter.await(); + } + catch (IgniteInterruptedCheckedException e) { + return e; + } + + return null; + } + ); + + latch.await(); + + assertFutureIsNotFinish(future); + + return future; + } + + /** + * Represent of command for waiting. + */ + interface Waiter { + /** + * Some waiting operation. + */ + void await() throws IgniteInterruptedCheckedException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java index b6a04d0..cf660c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java @@ -123,7 +123,7 @@ public class StandaloneWalRecordsIteratorTest extends GridCommonAbstractTest { assertTrue("At least one WAL file must be opened!", CountedFileIO.getCountOpenedWalFiles() > 0); - assertEquals("All WAL files must be closed!", CountedFileIO.getCountOpenedWalFiles(), CountedFileIO.getCountClosedWalFiles()); + assertTrue("All WAL files must be closed at least ones!", CountedFileIO.getCountOpenedWalFiles() <= CountedFileIO.getCountClosedWalFiles()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index e4ea4a9..3c4bc4f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -1313,25 +1313,7 @@ public final class GridTestUtils { assert fieldName != null; try { - // Resolve inner field. - Field field = cls.getDeclaredField(fieldName); - - synchronized (field) { - // Backup accessible field state. - boolean accessible = field.isAccessible(); - - try { - if (!accessible) - field.setAccessible(true); - - obj = field.get(obj); - } - finally { - // Recover accessible field state. - if (!accessible) - field.setAccessible(false); - } - } + obj = findField(cls, obj, fieldName); return (T)obj; } @@ -1361,25 +1343,7 @@ public final class GridTestUtils { Class<?> cls = obj instanceof Class ? (Class)obj : obj.getClass(); try { - // Resolve inner field. - Field field = cls.getDeclaredField(fieldName); - - synchronized (field) { - // Backup accessible field state. - boolean accessible = field.isAccessible(); - - try { - if (!accessible) - field.setAccessible(true); - - obj = field.get(obj); - } - finally { - // Recover accessible field state. - if (!accessible) - field.setAccessible(false); - } - } + obj = findField(cls, obj, fieldName); } catch (NoSuchFieldException e) { // Resolve inner class, if not an inner field. @@ -1402,6 +1366,74 @@ public final class GridTestUtils { } /** + * Get object field value via reflection(including superclass). + * + * @param obj Object or class to get field value from. + * @param fieldNames Field names to get value for: obj->field1->field2->...->fieldN. + * @param <T> Expected field class. + * @return Field value. + * @throws IgniteException In case of error. + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + public static <T> T getFieldValueHierarchy(Object obj, String... fieldNames) throws IgniteException { + assert obj != null; + assert fieldNames != null; + assert fieldNames.length >= 1; + + try { + for (String fieldName : fieldNames) { + Class<?> cls = obj instanceof Class ? (Class)obj : obj.getClass(); + + while (cls != null) { + try { + obj = findField(cls, obj, fieldName); + + break; + } + catch (NoSuchFieldException e) { + cls = cls.getSuperclass(); + } + } + } + + return (T)obj; + } + catch (IllegalAccessException e) { + throw new IgniteException("Failed to get object field [obj=" + obj + + ", fieldNames=" + Arrays.toString(fieldNames) + ']', e); + } + } + + /** + * @param cls Class for searching. + * @param obj Target object. + * @param fieldName Field name for search. + * @return Field from object if it was found. + */ + private static Object findField(Class<?> cls, Object obj, + String fieldName) throws NoSuchFieldException, IllegalAccessException { + // Resolve inner field. + Field field = cls.getDeclaredField(fieldName); + + synchronized (field) { + // Backup accessible field state. + boolean accessible = field.isAccessible(); + + try { + if (!accessible) + field.setAccessible(true); + + return field.get(obj); + } + finally { + // Recover accessible field state. + if (!accessible) + field.setAccessible(false); + } + } + } + + /** * Get inner class by its name from the enclosing class. * * @param parentCls Parent class to resolve inner class for. http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index 9f50b32..2b57223 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -50,6 +50,8 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.CpTriggeredWa import org.apache.ignite.internal.processors.cache.persistence.wal.ExplicitWalDeltaConsistencyTest; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBufferTest; import org.apache.ignite.internal.processors.cache.persistence.wal.SysPropWalDeltaConsistencyTest; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAwareTest; import org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest; import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest; import org.apache.ignite.internal.processors.database.IgniteDbPutGetWithCacheStoreTest; @@ -99,6 +101,8 @@ public class IgnitePdsTestSuite extends TestSuite { // Binary meta tests. suite.addTestSuite(IgnitePdsCacheObjectBinaryProcessorOnDiscoveryTest.class); + suite.addTestSuite(SegmentAwareTest.class); + return suite; } http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java index fcbad9b..c2b5dc2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java @@ -30,6 +30,7 @@ import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; @@ -74,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.CA; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.PAX; @@ -123,6 +125,9 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { /** */ private int walSegmentSize; + /** */ + private int walSegments = 10; + /** Log only. */ private boolean logOnly; @@ -176,6 +181,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { if (walSegmentSize != 0) dbCfg.setWalSegmentSize(walSegmentSize); + dbCfg.setWalSegments(walSegments); + cfg.setDataStorageConfiguration(dbCfg); BinaryConfiguration binCfg = new BinaryConfiguration(); @@ -446,7 +453,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { Arrays.fill(data, (byte)i); - final byte[] loaded = (byte[]) cache.get(i); + final byte[] loaded = (byte[])cache.get(i); Assert.assertArrayEquals(data, loaded); @@ -913,7 +920,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { sharedCtx0.database().checkpointReadLock(); try { - storage0.putData(String.valueOf(i), new byte[]{(byte)(i % 256), 2, 3}); + storage0.putData(String.valueOf(i), new byte[] {(byte)(i % 256), 2, 3}); } finally { sharedCtx0.database().checkpointReadUnlock(); @@ -973,7 +980,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { for (int i = 0; i < cnt; i++) { byte[] b1 = new byte[arraySize]; for (int k = 0; k < arraySize; k++) { - b1[k] = (byte) (k % 100); + b1[k] = (byte)(k % 100); } sharedCtx.database().checkpointReadLock(); @@ -991,7 +998,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { assertEquals(arraySize, d2.length); for (int k = 0; k < arraySize; k++) { - assertEquals((byte) (k % 100), d2[k]); + assertEquals((byte)(k % 100), d2[k]); } } @@ -1022,7 +1029,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { sharedCtx0.database().checkpointReadLock(); try { - storage.putData(String.valueOf(i), new byte[]{1, 2, 3}); + storage.putData(String.valueOf(i), new byte[] {1, 2, 3}); } finally { sharedCtx0.database().checkpointReadUnlock(); @@ -1075,7 +1082,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { sharedCtx0.database().checkpointReadLock(); try { - storage.putData(String.valueOf(i), new byte[]{1, 2, 3}); + storage.putData(String.valueOf(i), new byte[] {1, 2, 3}); } finally { sharedCtx0.database().checkpointReadUnlock(); @@ -1086,7 +1093,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { sharedCtx0.database().checkpointReadLock(); try { - storage.putData(String.valueOf(i), new byte[]{2, 2, 3, 4}); + storage.putData(String.valueOf(i), new byte[] {2, 2, 3, 4}); } finally { sharedCtx0.database().checkpointReadUnlock(); @@ -1127,7 +1134,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { sharedCtx0.database().checkpointReadLock(); try { - storage.putData(String.valueOf(i), new byte[]{1, 2, 3}); + storage.putData(String.valueOf(i), new byte[] {1, 2, 3}); } finally { sharedCtx0.database().checkpointReadUnlock(); @@ -1165,6 +1172,74 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { /** * @throws Exception if failed. */ + public void testAbsentDeadlock_Iterator_RollOver_Archivation() throws Exception { + try { + walSegments = 2; + + walSegmentSize = 512 * 1024; + + IgniteEx ignite0 = (IgniteEx)startGrid("node0"); + + ignite0.active(true); + + IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME); + + for (int i = 0; i < 100; i++) + cache0.put(i, new IndexedObject(i)); + + GridCacheSharedContext<Object, Object> sharedCtx = ignite0.context().cache().context(); + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database(); + + db.waitForCheckpoint("test"); + db.enableCheckpoints(false).get(); + + // Log something to know where to start. + WALPointer ptr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis())); + + info("Replay marker: " + ptr); + + for (int i = 100; i < 200; i++) + cache0.put(i, new IndexedObject(i)); + + CountDownLatch insertFinished = new CountDownLatch(1); + GridTestUtils.runAsync( + () -> { + try (WALIterator it = sharedCtx.wal().replay(ptr)) { + if (it.hasNext()) { + it.next(); + + insertFinished.await(); + } + } + + return null; + } + ); + + IgniteInternalFuture<Object> future = GridTestUtils.runAsync( + () -> { + for (int i = 0; i < 10000; i++) + cache0.put(i, new IndexedObject(i)); + + return null; + } + ); + + future.get(); + + insertFinished.countDown(); + + ignite0.close(); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception if failed. + */ public void testApplyDeltaRecords() throws Exception { try { IgniteEx ignite0 = (IgniteEx)startGrid("node0"); @@ -1310,12 +1385,12 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { final String cacheName = "transactional"; CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>(cacheName) - .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) - .setAffinity(new RendezvousAffinityFunction(false, 32)) - .setCacheMode(CacheMode.PARTITIONED) - .setRebalanceMode(CacheRebalanceMode.SYNC) - .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) - .setBackups(2); + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setCacheMode(CacheMode.PARTITIONED) + .setRebalanceMode(CacheRebalanceMode.SYNC) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setBackups(2); ignite.createCache(cacheConfiguration); @@ -1329,7 +1404,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { for (int t = 1; t <= transactions; t++) { Transaction tx = ignite.transactions().txStart( - TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); Map<Object, Object> changesInTransaction = new HashMap<>(); @@ -1388,12 +1463,12 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { final String cacheName = "transactional"; CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>(cacheName) - .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) - .setAffinity(new RendezvousAffinityFunction(false, 32)) - .setCacheMode(CacheMode.PARTITIONED) - .setRebalanceMode(CacheRebalanceMode.SYNC) - .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) - .setBackups(0); + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setCacheMode(CacheMode.PARTITIONED) + .setRebalanceMode(CacheRebalanceMode.SYNC) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setBackups(0); ignite.createCache(cacheConfiguration); @@ -1416,7 +1491,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { for (int t = 1; t <= transactions; t++) { Transaction tx = ignite.transactions().txStart( - TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); for (int op = 0; op < operationsPerTransaction; op++) { int key = random.nextInt(1000) + 1; @@ -1447,7 +1522,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { WALRecord rec = tup.get2(); if (rec instanceof TxRecord) { - TxRecord txRecord = (TxRecord) rec; + TxRecord txRecord = (TxRecord)rec; GridCacheVersion txId = txRecord.nearXidVersion(); switch (txRecord.state()) { @@ -1470,8 +1545,9 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { default: throw new IllegalStateException("Unknown Tx state of record " + txRecord); } - } else if (rec instanceof DataRecord) { - DataRecord dataRecord = (DataRecord) rec; + } + else if (rec instanceof DataRecord) { + DataRecord dataRecord = (DataRecord)rec; for (DataEntry entry : dataRecord.writeEntries()) { GridCacheVersion txId = entry.nearXidVersion(); @@ -1514,16 +1590,18 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { // Create pseudo-random array. for (int i = 0; i < payload.length; i++) if (i % index == 0) - payload[i] = (byte) index; + payload[i] = (byte)index; } @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - BigObject bigObject = (BigObject) o; + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + BigObject bigObject = (BigObject)o; return index == bigObject.index && - Arrays.equals(payload, bigObject.payload); + Arrays.equals(payload, bigObject.payload); } @Override @@ -1867,7 +1945,6 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { } } - /** * */