IGNITE-10522 Fix checkpoint tmp files doesn't not removed on start node - Fixes #5571.
Signed-off-by: Dmitriy Govorukhin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/472889d2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/472889d2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/472889d2 Branch: refs/heads/ignite-10189 Commit: 472889d2e4f32a0fe7119e5fa44dcbaba84269f7 Parents: 57ccf9b Author: Anton Kalashnikov <[email protected]> Authored: Fri Dec 7 13:33:49 2018 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Fri Dec 7 13:33:49 2018 +0300 ---------------------------------------------------------------------- .../GridCacheDatabaseSharedManager.java | 6 +- .../persistence/file/FilePageStoreManager.java | 6 + .../ignite/internal/util/IgniteUtils.java | 2 +- .../testframework/junits/GridAbstractTest.java | 4 +- .../db/wal/IgniteWalRecoveryTest.java | 1125 ++++++++---------- 5 files changed, 532 insertions(+), 611 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/472889d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 95a99fb..fd56262 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -174,6 +174,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_ import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER; import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize; /** @@ -537,10 +538,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ @Override public void cleanupTempCheckpointDirectory() throws IgniteCheckedException { try { - try (DirectoryStream<Path> files = Files.newDirectoryStream( - cpDir.toPath(), - path -> path.endsWith(FilePageStoreManager.TMP_SUFFIX)) - ) { + try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath(), TMP_FILE_MATCHER::matches)) { for (Path path : files) Files.delete(path); } http://git-wip-us.apache.org/repos/asf/ignite/blob/472889d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 86560ba..3544485 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -27,8 +27,10 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.file.DirectoryStream; +import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.PathMatcher; import java.nio.file.StandardCopyOption; import java.util.AbstractList; import java.util.Arrays; @@ -118,6 +120,10 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** */ public static final String META_STORAGE_NAME = "metastorage"; + /** Matcher for searching of *.tmp files. */ + public static final PathMatcher TMP_FILE_MATCHER = + FileSystems.getDefault().getPathMatcher("glob:**" + TMP_SUFFIX); + /** Marshaller. */ private static final Marshaller marshaller = new JdkMarshaller(); http://git-wip-us.apache.org/repos/asf/ignite/blob/472889d2/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 6da5c6e..2f8eb1c 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -3585,7 +3585,7 @@ public abstract class IgniteUtils { } } - if (path.endsWith("jar")) { + if (path.toFile().getName().endsWith("jar")) { try { // Why do we do this? new JarFile(path.toString(), false).close(); http://git-wip-us.apache.org/repos/asf/ignite/blob/472889d2/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 1db255c..42a2e60 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.testframework.junits; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; import java.io.ObjectStreamException; import java.io.Serializable; import java.lang.annotation.Annotation; @@ -41,8 +43,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import javax.cache.configuration.Factory; -import javax.cache.configuration.FactoryBuilder; import junit.framework.TestCase; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; http://git-wip-us.apache.org/repos/asf/ignite/blob/472889d2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java index 04e90d5..e64800a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java @@ -58,9 +58,9 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.internal.DiscoverySpiTestListener; import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.failure.NoOpFailureHandler; +import org.apache.ignite.internal.DiscoverySpiTestListener; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -109,6 +109,7 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.junit.Assert; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; @@ -124,6 +125,12 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { private static final int LARGE_ARR_SIZE = 1025; /** */ + private static final int LARGE_ENTRY_COUNT = 500; + + /** */ + private static final int ENTRY_COUNT = 2_000; + + /** */ private boolean fork; /** */ @@ -139,20 +146,24 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { private static final String LOC_CACHE_NAME = "local"; /** */ - private boolean renamed; + private boolean renamed = false; /** */ - private int walSegmentSize; + private int walSegmentSize = 16 * 1024 * 1024; /** */ private int walSegments = 10; /** Log only. */ - private boolean logOnly; + private boolean logOnly = false; /** */ private long customFailureDetectionTimeout = -1; + /** */ + private long checkpointFrequency = DFLT_CHECKPOINT_FREQ; + ; + /** {@inheritDoc} */ @Override protected boolean isMultiJvm() { return fork; @@ -202,6 +213,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { dbCfg.setWalSegments(walSegments); + dbCfg.setCheckpointFrequency(checkpointFrequency); + cfg.setDataStorageConfiguration(dbCfg); BinaryConfiguration binCfg = new BinaryConfiguration(); @@ -224,16 +237,12 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { stopAllGrids(); cleanPersistenceDir(); - - renamed = false; } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); - logOnly = false; - cleanPersistenceDir(); } @@ -326,174 +335,159 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception If fail. */ public void testSwitchClassLoader() throws Exception { - try { - final IgniteEx igniteEx = startGrid(1); + final IgniteEx igniteEx = startGrid(1); - // CustomDiscoveryMessage will trigger service tasks - startGrid(2); + // CustomDiscoveryMessage will trigger service tasks + startGrid(2); - igniteEx.cluster().active(true); + igniteEx.cluster().active(true); - IgniteCache<Integer, EnumVal> cache = igniteEx.cache(CACHE_NAME); + IgniteCache<Integer, EnumVal> cache = igniteEx.cache(CACHE_NAME); - final ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); - final ClassLoader newCl = getExternalClassLoader(); + final ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); + final ClassLoader newCl = getExternalClassLoader(); - Thread.currentThread().setContextClassLoader(newCl); + Thread.currentThread().setContextClassLoader(newCl); - for (int i = 0; i < 10; i++) - cache.put(i, i % 2 == 0 ? EnumVal.VAL1 : EnumVal.VAL2); + for (int i = 0; i < 10; i++) + cache.put(i, i % 2 == 0 ? EnumVal.VAL1 : EnumVal.VAL2); - for (int i = 0; i < 10; i++) - assert cache.containsKey(i); + for (int i = 0; i < 10; i++) + assert cache.containsKey(i); - // Invokes ClearTask with new class loader - cache.clear(); + // Invokes ClearTask with new class loader + cache.clear(); - Thread.currentThread().setContextClassLoader(oldCl); + Thread.currentThread().setContextClassLoader(oldCl); - for (int i = 0; i < 10; i++) - cache.put(i, i % 2 == 0 ? EnumVal.VAL1 : EnumVal.VAL2); + for (int i = 0; i < 10; i++) + cache.put(i, i % 2 == 0 ? EnumVal.VAL1 : EnumVal.VAL2); - for (int i = 0; i < 10; i++) - assert cache.containsKey(i); - } - finally { - stopAllGrids(); - } + for (int i = 0; i < 10; i++) + assert cache.containsKey(i); } /** * @throws Exception if failed. */ public void testWalSimple() throws Exception { - try { - IgniteEx ignite = startGrid(1); - - ignite.cluster().active(true); - - IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); - - info(" --> step1"); - - for (int i = 0; i < 10_000; i += 2) - cache.put(i, new IndexedObject(i)); - - info(" --> step2"); + IgniteEx ignite = startGrid(1); - for (int i = 0; i < 10_000; i += 3) - cache.put(i, new IndexedObject(i * 2)); + ignite.cluster().active(true); - info(" --> step3"); + IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); - for (int i = 0; i < 10_000; i += 7) - cache.put(i, new IndexedObject(i * 3)); + info(" --> step1"); - info(" --> check1"); + for (int i = 0; i < 10_000; i += 2) + cache.put(i, new IndexedObject(i)); - // Check. - for (int i = 0; i < 10_000; i++) { - IndexedObject o; + info(" --> step2"); - if (i % 7 == 0) - o = new IndexedObject(i * 3); - else if (i % 3 == 0) - o = new IndexedObject(i * 2); - else if (i % 2 == 0) - o = new IndexedObject(i); - else - o = null; + for (int i = 0; i < 10_000; i += 3) + cache.put(i, new IndexedObject(i * 2)); - assertEquals(o, cache.get(i)); - } + info(" --> step3"); - stopGrid(1); + for (int i = 0; i < 10_000; i += 7) + cache.put(i, new IndexedObject(i * 3)); - ignite = startGrid(1); + info(" --> check1"); - ignite.cluster().active(true); + // Check. + for (int i = 0; i < 10_000; i++) { + IndexedObject o; + + if (i % 7 == 0) + o = new IndexedObject(i * 3); + else if (i % 3 == 0) + o = new IndexedObject(i * 2); + else if (i % 2 == 0) + o = new IndexedObject(i); + else + o = null; + + assertEquals(o, cache.get(i)); + } - cache = ignite.cache(CACHE_NAME); + stopGrid(1); - info(" --> check2"); + ignite = startGrid(1); - // Check. - for (int i = 0; i < 10_000; i++) { - IndexedObject o; + ignite.cluster().active(true); - if (i % 7 == 0) - o = new IndexedObject(i * 3); - else if (i % 3 == 0) - o = new IndexedObject(i * 2); - else if (i % 2 == 0) - o = new IndexedObject(i); - else - o = null; + cache = ignite.cache(CACHE_NAME); - assertEquals(o, cache.get(i)); - } + info(" --> check2"); - info(" --> ok"); - } - finally { - stopAllGrids(); + // Check. + for (int i = 0; i < 10_000; i++) { + IndexedObject o; + + if (i % 7 == 0) + o = new IndexedObject(i * 3); + else if (i % 3 == 0) + o = new IndexedObject(i * 2); + else if (i % 2 == 0) + o = new IndexedObject(i); + else + o = null; + + assertEquals(o, cache.get(i)); } + + info(" --> ok"); } /** * @throws Exception If fail. */ public void testWalLargeValue() throws Exception { - try { - IgniteEx ignite = startGrid(1); + IgniteEx ignite = startGrid(1); - ignite.cluster().active(true); + ignite.cluster().active(true); - IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); + IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); - for (int i = 0; i < 10_000; i++) { - final byte[] data = new byte[i]; + for (int i = 0; i < 10_000; i++) { + final byte[] data = new byte[i]; - Arrays.fill(data, (byte)i); + Arrays.fill(data, (byte)i); - cache.put(i, data); + cache.put(i, data); - if (i % 1000 == 0) - X.println(" ---> put: " + i); - } + if (i % 1000 == 0) + X.println(" ---> put: " + i); + } - stopGrid(1); + stopGrid(1); - ignite = startGrid(1); + ignite = startGrid(1); - ignite.cluster().active(true); + ignite.cluster().active(true); - cache = ignite.cache(CACHE_NAME); + cache = ignite.cache(CACHE_NAME); - info(" --> check2"); + info(" --> check2"); - for (int i = 0; i < 10_000; i++) { - final byte[] data = new byte[i]; + for (int i = 0; i < 10_000; i++) { + final byte[] data = new byte[i]; - Arrays.fill(data, (byte)i); + Arrays.fill(data, (byte)i); - final byte[] loaded = (byte[])cache.get(i); + final byte[] loaded = (byte[])cache.get(i); - Assert.assertArrayEquals(data, loaded); + Assert.assertArrayEquals(data, loaded); - if (i % 1000 == 0) - X.println(" ---> get: " + i); - } - } - finally { - stopAllGrids(); + if (i % 1000 == 0) + X.println(" ---> get: " + i); } } /** - * Check binary recover completes successfully when node stopped at the middle of checkpoint. - * Destroy cache_data.bin file for particular cache to emulate missing {@link DynamicCacheDescriptor} - * file (binary recovery should complete successfully in this case). + * Check binary recover completes successfully when node stopped at the middle of checkpoint. Destroy cache_data.bin + * file for particular cache to emulate missing {@link DynamicCacheDescriptor} file (binary recovery should complete + * successfully in this case). * * @throws Exception if failed. */ @@ -662,8 +656,6 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { } finally { customFailureDetectionTimeout = prevFDTimeout; - - stopAllGrids(); } } @@ -675,65 +667,55 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { final long endTime = System.currentTimeMillis() + 60 * 1000; - try { - IgniteEx ignite = startGrid(1); + IgniteEx ignite = startGrid(1); - ignite.cluster().active(true); + ignite.cluster().active(true); - final IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); + final IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); - GridTestUtils.runMultiThreaded(new Callable<Void>() { - @Override public Void call() { - Random rnd = ThreadLocalRandom.current(); + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() { + Random rnd = ThreadLocalRandom.current(); - while (U.currentTimeMillis() < endTime) - cache.put(rnd.nextInt(50_000), rnd.nextInt()); + while (U.currentTimeMillis() < endTime) + cache.put(rnd.nextInt(50_000), rnd.nextInt()); - return null; - } - }, 16, "put-thread"); - } - finally { - stopAllGrids(); - } + return null; + } + }, 16, "put-thread"); } /** * @throws Exception If fail. */ public void testWalRenameDirSimple() throws Exception { - try { - IgniteEx ignite = startGrid(1); + IgniteEx ignite = startGrid(1); - ignite.cluster().active(true); + ignite.cluster().active(true); - IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); + IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); - for (int i = 0; i < 100; i++) - cache.put(i, new IndexedObject(i)); + for (int i = 0; i < 100; i++) + cache.put(i, new IndexedObject(i)); - final Object consistentId = ignite.cluster().localNode().consistentId(); + final Object consistentId = ignite.cluster().localNode().consistentId(); - stopGrid(1); + stopGrid(1); - final File cacheDir = cacheDir(CACHE_NAME, consistentId.toString()); + final File cacheDir = cacheDir(CACHE_NAME, consistentId.toString()); - renamed = cacheDir.renameTo(new File(cacheDir.getParent(), "cache-" + RENAMED_CACHE_NAME)); + renamed = cacheDir.renameTo(new File(cacheDir.getParent(), "cache-" + RENAMED_CACHE_NAME)); - assert renamed; + assert renamed; - ignite = startGrid(1); + ignite = startGrid(1); - ignite.cluster().active(true); + ignite.cluster().active(true); - cache = ignite.cache(RENAMED_CACHE_NAME); + cache = ignite.cache(RENAMED_CACHE_NAME); - for (int i = 0; i < 100; i++) - assertEquals(new IndexedObject(i), cache.get(i)); - } - finally { - stopAllGrids(); - } + for (int i = 0; i < 100; i++) + assertEquals(new IndexedObject(i), cache.get(i)); } /** @@ -765,102 +747,92 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception if failed. */ public void testRecoveryNoCheckpoint() throws Exception { - try { - IgniteEx ctrlGrid = startGrid(0); + IgniteEx ctrlGrid = startGrid(0); - fork = true; + fork = true; - IgniteEx cacheGrid = startGrid(1); + IgniteEx cacheGrid = startGrid(1); - ctrlGrid.cluster().active(true); + ctrlGrid.cluster().active(true); - ctrlGrid.compute(ctrlGrid.cluster().forRemotes()).run(new LoadRunnable(false)); + ctrlGrid.compute(ctrlGrid.cluster().forRemotes()).run(new LoadRunnable(false)); - info("Killing remote process..."); + info("Killing remote process..."); - ((IgniteProcessProxy)cacheGrid).kill(); + ((IgniteProcessProxy)cacheGrid).kill(); - final IgniteEx g0 = ctrlGrid; + final IgniteEx g0 = ctrlGrid; - GridTestUtils.waitForCondition(new PA() { - /** {@inheritDoc} */ - @Override public boolean apply() { - return g0.cluster().nodes().size() == 1; - } - }, getTestTimeout()); + GridTestUtils.waitForCondition(new PA() { + /** {@inheritDoc} */ + @Override public boolean apply() { + return g0.cluster().nodes().size() == 1; + } + }, getTestTimeout()); - fork = false; + fork = false; - // Now start the grid and verify that updates were restored from WAL. - cacheGrid = startGrid(1); + // Now start the grid and verify that updates were restored from WAL. + cacheGrid = startGrid(1); - IgniteCache<Object, Object> cache = cacheGrid.cache(CACHE_NAME); + IgniteCache<Object, Object> cache = cacheGrid.cache(CACHE_NAME); - for (int i = 0; i < 10_000; i++) - assertEquals(new IndexedObject(i), cache.get(i)); + for (int i = 0; i < ENTRY_COUNT; i++) + assertEquals(new IndexedObject(i), cache.get(i)); - List<List<?>> res = cache.query(new SqlFieldsQuery("select count(iVal) from IndexedObject")).getAll(); + List<List<?>> res = cache.query(new SqlFieldsQuery("select count(iVal) from IndexedObject")).getAll(); - assertEquals(1, res.size()); - assertEquals(10_000L, res.get(0).get(0)); + assertEquals(1, res.size()); + assertEquals((long)ENTRY_COUNT, res.get(0).get(0)); - IgniteCache<Object, Object> locCache = cacheGrid.cache(LOC_CACHE_NAME); + IgniteCache<Object, Object> locCache = cacheGrid.cache(LOC_CACHE_NAME); - for (int i = 0; i < 10_000; i++) - assertEquals(new IndexedObject(i), locCache.get(i)); - } - finally { - stopAllGrids(); - } + for (int i = 0; i < ENTRY_COUNT; i++) + assertEquals(new IndexedObject(i), locCache.get(i)); } /** * @throws Exception if failed. */ public void testRecoveryLargeNoCheckpoint() throws Exception { - try { - IgniteEx ctrlGrid = startGrid(0); + IgniteEx ctrlGrid = startGrid(0); - fork = true; + fork = true; - IgniteEx cacheGrid = startGrid(1); + IgniteEx cacheGrid = startGrid(1); - ctrlGrid.cluster().active(true); + ctrlGrid.cluster().active(true); - ctrlGrid.compute(ctrlGrid.cluster().forRemotes()).run(new LargeLoadRunnable(false)); + ctrlGrid.compute(ctrlGrid.cluster().forRemotes()).run(new LargeLoadRunnable(false)); - info("Killing remote process..."); + info("Killing remote process..."); - ((IgniteProcessProxy)cacheGrid).kill(); + ((IgniteProcessProxy)cacheGrid).kill(); - final IgniteEx g0 = ctrlGrid; + final IgniteEx g0 = ctrlGrid; - GridTestUtils.waitForCondition(new PA() { - /** {@inheritDoc} */ - @Override public boolean apply() { - return g0.cluster().nodes().size() == 1; - } - }, getTestTimeout()); + GridTestUtils.waitForCondition(new PA() { + /** {@inheritDoc} */ + @Override public boolean apply() { + return g0.cluster().nodes().size() == 1; + } + }, getTestTimeout()); - fork = false; + fork = false; - // Now start the grid and verify that updates were restored from WAL. - cacheGrid = startGrid(1); + // Now start the grid and verify that updates were restored from WAL. + cacheGrid = startGrid(1); - IgniteCache<Object, Object> cache = cacheGrid.cache(CACHE_NAME); - IgniteCache<Object, Object> locCache = cacheGrid.cache(LOC_CACHE_NAME); + IgniteCache<Object, Object> cache = cacheGrid.cache(CACHE_NAME); + IgniteCache<Object, Object> locCache = cacheGrid.cache(LOC_CACHE_NAME); - for (int i = 0; i < 1000; i++) { - final long[] data = new long[LARGE_ARR_SIZE]; + for (int i = 0; i < LARGE_ENTRY_COUNT; i++) { + final long[] data = new long[LARGE_ARR_SIZE]; - Arrays.fill(data, i); + Arrays.fill(data, i); - Assert.assertArrayEquals(data, (long[])cache.get(i)); - Assert.assertArrayEquals(data, (long[])locCache.get(i)); - } - } - finally { - stopAllGrids(); + Assert.assertArrayEquals(data, (long[])cache.get(i)); + Assert.assertArrayEquals(data, (long[])locCache.get(i)); } } @@ -873,76 +845,70 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception if failed. */ public void testRandomCrash() throws Exception { - try { - IgniteEx ctrlGrid = startGrid(0); + checkpointFrequency = 2_000 + new Random().nextInt(4_000); - fork = true; + IgniteEx ctrlGrid = startGrid(0); - IgniteEx cacheGrid = startGrid(1); + fork = true; - ctrlGrid.cluster().active(true); + IgniteEx cacheGrid = startGrid(1); - IgniteCompute rmt = ctrlGrid.compute(ctrlGrid.cluster().forRemotes()); + ctrlGrid.cluster().active(true); - rmt.run(new LoadRunnable(false)); + IgniteCompute rmt = ctrlGrid.compute(ctrlGrid.cluster().forRemotes()); - info(">>> Finished cache population."); + rmt.run(new LoadRunnable(false)); - rmt.run(new AsyncLoadRunnable()); + info(">>> Finished cache population."); - Thread.sleep(20_000); + rmt.run(new AsyncLoadRunnable()); - info(">>> Killing remote process..."); + Thread.sleep(5_000); - ((IgniteProcessProxy)cacheGrid).kill(); + info(">>> Killing remote process..."); - startGrid(1); + ((IgniteProcessProxy)cacheGrid).kill(); - Boolean res = rmt.call(new VerifyCallable()); + startGrid(1); - assertTrue(res); - } - finally { - stopAllGrids(); - } + Boolean res = rmt.call(new VerifyCallable()); + + assertTrue(res); } /** * @throws Exception if failed. */ public void testLargeRandomCrash() throws Exception { - try { - IgniteEx ctrlGrid = startGrid(0); + checkpointFrequency = 2_000 + new Random().nextInt(4_000); - fork = true; + IgniteEx ctrlGrid = startGrid(0); - IgniteEx cacheGrid = startGrid(1); + fork = true; - ctrlGrid.cluster().active(true); + IgniteEx cacheGrid = startGrid(1); - IgniteCompute rmt = ctrlGrid.compute(ctrlGrid.cluster().forRemotes()); + ctrlGrid.cluster().active(true); - rmt.run(new LargeLoadRunnable(false)); + IgniteCompute rmt = ctrlGrid.compute(ctrlGrid.cluster().forRemotes()); - info(">>> Finished cache population."); + rmt.run(new LargeLoadRunnable(false)); - rmt.run(new AsyncLargeLoadRunnable()); + info(">>> Finished cache population."); - Thread.sleep(20_000); + rmt.run(new AsyncLargeLoadRunnable()); - info(">>> Killing remote process..."); + Thread.sleep(5_000); - ((IgniteProcessProxy)cacheGrid).kill(); + info(">>> Killing remote process..."); - startGrid(1); + ((IgniteProcessProxy)cacheGrid).kill(); - Boolean res = rmt.call(new VerifyLargeCallable()); + startGrid(1); - assertTrue(res); - } - finally { - stopAllGrids(); - } + Boolean res = rmt.call(new VerifyLargeCallable()); + + assertTrue(res); } /** @@ -959,68 +925,58 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testDestroyCache() throws Exception { - try { - IgniteEx ignite = startGrid(1); + IgniteEx ignite = startGrid(1); - ignite.cluster().active(true); + ignite.cluster().active(true); - IgniteCache<Object, Object> cache = ignite.getOrCreateCache("test"); + IgniteCache<Object, Object> cache = ignite.getOrCreateCache("test"); - cache.put(1, new IndexedObject(1)); + cache.put(1, new IndexedObject(1)); - ignite.destroyCache("test"); + ignite.destroyCache("test"); - cache = ignite.getOrCreateCache("test"); + cache = ignite.getOrCreateCache("test"); - // No entry available after cache destroy. - assertNull(cache.get(1)); - } - finally { - stopAllGrids(); - } + // No entry available after cache destroy. + assertNull(cache.get(1)); } /** * @throws Exception If fail. */ public void testEvictPartition() throws Exception { - try { - Ignite ignite1 = startGrid("node1"); + Ignite ignite1 = startGrid("node1"); - ignite1.cluster().active(true); + ignite1.cluster().active(true); - IgniteCache<Object, Object> cache1 = ignite1.cache(CACHE_NAME); + IgniteCache<Object, Object> cache1 = ignite1.cache(CACHE_NAME); - for (int i = 0; i < 100; i++) - cache1.put(i, new IndexedObject(i)); + for (int i = 0; i < 100; i++) + cache1.put(i, new IndexedObject(i)); - Ignite ignite2 = startGrid("node2"); + Ignite ignite2 = startGrid("node2"); - IgniteCache<Object, Object> cache2 = ignite2.cache(CACHE_NAME); + IgniteCache<Object, Object> cache2 = ignite2.cache(CACHE_NAME); - for (int i = 0; i < 100; i++) { - assertEquals(new IndexedObject(i), cache1.get(i)); - assertEquals(new IndexedObject(i), cache2.get(i)); - } + for (int i = 0; i < 100; i++) { + assertEquals(new IndexedObject(i), cache1.get(i)); + assertEquals(new IndexedObject(i), cache2.get(i)); + } - ignite1.close(); - ignite2.close(); + ignite1.close(); + ignite2.close(); - ignite1 = startGrid("node1"); - ignite2 = startGrid("node2"); + ignite1 = startGrid("node1"); + ignite2 = startGrid("node2"); - ignite1.cluster().active(true); + ignite1.cluster().active(true); - cache1 = ignite1.cache(CACHE_NAME); - cache2 = ignite2.cache(CACHE_NAME); + cache1 = ignite1.cache(CACHE_NAME); + cache2 = ignite2.cache(CACHE_NAME); - for (int i = 0; i < 100; i++) { - assertEquals(new IndexedObject(i), cache1.get(i)); - assertEquals(new IndexedObject(i), cache2.get(i)); - } - } - finally { - stopAllGrids(); + for (int i = 0; i < 100; i++) { + assertEquals(new IndexedObject(i), cache1.get(i)); + assertEquals(new IndexedObject(i), cache2.get(i)); } } @@ -1028,64 +984,58 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception If fail. */ public void testMetastorage() throws Exception { - try { - int cnt = 5000; + int cnt = 5000; - IgniteEx ignite0 = (IgniteEx)startGrid("node1"); - IgniteEx ignite1 = (IgniteEx)startGrid("node2"); + IgniteEx ignite0 = (IgniteEx)startGrid("node1"); + IgniteEx ignite1 = (IgniteEx)startGrid("node2"); - ignite1.cluster().active(true); + ignite1.cluster().active(true); - GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context(); - GridCacheSharedContext<Object, Object> sharedCtx1 = ignite1.context().cache().context(); + GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context(); + GridCacheSharedContext<Object, Object> sharedCtx1 = ignite1.context().cache().context(); - MetaStorage storage0 = sharedCtx0.database().metaStorage(); - MetaStorage storage1 = sharedCtx1.database().metaStorage(); + MetaStorage storage0 = sharedCtx0.database().metaStorage(); + MetaStorage storage1 = sharedCtx1.database().metaStorage(); - assert storage0 != null; + assert storage0 != null; - for (int i = 0; i < cnt; i++) { - sharedCtx0.database().checkpointReadLock(); + for (int i = 0; i < cnt; i++) { + sharedCtx0.database().checkpointReadLock(); - try { - storage0.putData(String.valueOf(i), new byte[] {(byte)(i % 256), 2, 3}); - } - finally { - sharedCtx0.database().checkpointReadUnlock(); - } + try { + storage0.putData(String.valueOf(i), new byte[] {(byte)(i % 256), 2, 3}); + } + finally { + sharedCtx0.database().checkpointReadUnlock(); + } - byte[] b1 = new byte[i + 3]; - b1[0] = 1; - b1[1] = 2; - b1[2] = 3; + byte[] b1 = new byte[i + 3]; + b1[0] = 1; + b1[1] = 2; + b1[2] = 3; - sharedCtx1.database().checkpointReadLock(); + sharedCtx1.database().checkpointReadLock(); - try { - storage1.putData(String.valueOf(i), b1); - } - finally { - sharedCtx1.database().checkpointReadUnlock(); - } + try { + storage1.putData(String.valueOf(i), b1); } - - for (int i = 0; i < cnt; i++) { - byte[] d1 = storage0.getData(String.valueOf(i)); - assertEquals(3, d1.length); - assertEquals((byte)(i % 256), d1[0]); - assertEquals(2, d1[1]); - assertEquals(3, d1[2]); - - byte[] d2 = storage1.getData(String.valueOf(i)); - assertEquals(i + 3, d2.length); - assertEquals(1, d2[0]); - assertEquals(2, d2[1]); - assertEquals(3, d2[2]); + finally { + sharedCtx1.database().checkpointReadUnlock(); } - } - finally { - stopAllGrids(); + + for (int i = 0; i < cnt; i++) { + byte[] d1 = storage0.getData(String.valueOf(i)); + assertEquals(3, d1.length); + assertEquals((byte)(i % 256), d1[0]); + assertEquals(2, d1[1]); + assertEquals(3, d1[2]); + + byte[] d2 = storage1.getData(String.valueOf(i)); + assertEquals(i + 3, d2.length); + assertEquals(1, d2[0]); + assertEquals(2, d2[1]); + assertEquals(3, d2[2]); } } @@ -1093,46 +1043,40 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception If fail. */ public void testMetastorageLargeArray() throws Exception { - try { - int cnt = 5000; - int arraySize = 32_768; + int cnt = 5000; + int arraySize = 32_768; - IgniteEx ignite = (IgniteEx)startGrid("node1"); + IgniteEx ignite = (IgniteEx)startGrid("node1"); - ignite.cluster().active(true); + ignite.cluster().active(true); - GridCacheSharedContext<Object, Object> sharedCtx = ignite.context().cache().context(); + GridCacheSharedContext<Object, Object> sharedCtx = ignite.context().cache().context(); - MetaStorage storage = sharedCtx.database().metaStorage(); + MetaStorage storage = sharedCtx.database().metaStorage(); - for (int i = 0; i < cnt; i++) { - byte[] b1 = new byte[arraySize]; - for (int k = 0; k < arraySize; k++) { - b1[k] = (byte)(k % 100); - } + for (int i = 0; i < cnt; i++) { + byte[] b1 = new byte[arraySize]; + for (int k = 0; k < arraySize; k++) { + b1[k] = (byte)(k % 100); + } - sharedCtx.database().checkpointReadLock(); + sharedCtx.database().checkpointReadLock(); - try { - storage.putData(String.valueOf(i), b1); - } - finally { - sharedCtx.database().checkpointReadUnlock(); - } + try { + storage.putData(String.valueOf(i), b1); } + finally { + sharedCtx.database().checkpointReadUnlock(); + } + } - for (int i = 0; i < cnt; i++) { - byte[] d2 = storage.getData(String.valueOf(i)); - assertEquals(arraySize, d2.length); + for (int i = 0; i < cnt; i++) { + byte[] d2 = storage.getData(String.valueOf(i)); + assertEquals(arraySize, d2.length); - for (int k = 0; k < arraySize; k++) { - assertEquals((byte)(k % 100), d2[k]); - } + for (int k = 0; k < arraySize; k++) { + assertEquals((byte)(k % 100), d2[k]); } - - } - finally { - stopAllGrids(); } } @@ -1140,52 +1084,46 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception If fail. */ public void testMetastorageRemove() throws Exception { - try { - int cnt = 400; + int cnt = 400; - IgniteEx ignite0 = (IgniteEx)startGrid("node1"); + IgniteEx ignite0 = (IgniteEx)startGrid("node1"); - ignite0.cluster().active(true); + ignite0.cluster().active(true); - GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context(); + GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context(); - MetaStorage storage = sharedCtx0.database().metaStorage(); + MetaStorage storage = sharedCtx0.database().metaStorage(); - assert storage != null; + assert storage != null; - for (int i = 0; i < cnt; i++) { - sharedCtx0.database().checkpointReadLock(); + for (int i = 0; i < cnt; i++) { + sharedCtx0.database().checkpointReadLock(); - try { - storage.putData(String.valueOf(i), new byte[] {1, 2, 3}); - } - finally { - sharedCtx0.database().checkpointReadUnlock(); - } + try { + storage.putData(String.valueOf(i), new byte[] {1, 2, 3}); } + finally { + sharedCtx0.database().checkpointReadUnlock(); + } + } - for (int i = 0; i < 10; i++) { - sharedCtx0.database().checkpointReadLock(); + for (int i = 0; i < 10; i++) { + sharedCtx0.database().checkpointReadLock(); - try { - storage.removeData(String.valueOf(i)); - } - finally { - sharedCtx0.database().checkpointReadUnlock(); - } + try { + storage.removeData(String.valueOf(i)); } - - for (int i = 10; i < cnt; i++) { - byte[] d1 = storage.getData(String.valueOf(i)); - assertEquals(3, d1.length); - assertEquals(1, d1[0]); - assertEquals(2, d1[1]); - assertEquals(3, d1[2]); + finally { + sharedCtx0.database().checkpointReadUnlock(); } - } - finally { - stopAllGrids(); + + for (int i = 10; i < cnt; i++) { + byte[] d1 = storage.getData(String.valueOf(i)); + assertEquals(3, d1.length); + assertEquals(1, d1[0]); + assertEquals(2, d1[1]); + assertEquals(3, d1[2]); } } @@ -1245,55 +1183,50 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception If fail. */ public void testMetastorageWalRestore() throws Exception { - try { - int cnt = 2000; + int cnt = 2000; - IgniteEx ignite0 = startGrid(0); + IgniteEx ignite0 = startGrid(0); - ignite0.cluster().active(true); + ignite0.cluster().active(true); - GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context(); + GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context(); - MetaStorage storage = sharedCtx0.database().metaStorage(); + MetaStorage storage = sharedCtx0.database().metaStorage(); - assert storage != null; + assert storage != null; - for (int i = 0; i < cnt; i++) { - sharedCtx0.database().checkpointReadLock(); + for (int i = 0; i < cnt; i++) { + sharedCtx0.database().checkpointReadLock(); - try { - storage.putData(String.valueOf(i), new byte[] {1, 2, 3}); - } - finally { - sharedCtx0.database().checkpointReadUnlock(); - } + try { + storage.putData(String.valueOf(i), new byte[] {1, 2, 3}); } - - for (int i = 0; i < cnt; i++) { - byte[] value = storage.getData(String.valueOf(i)); - assert value != null; - assert value.length == 3; + finally { + sharedCtx0.database().checkpointReadUnlock(); } + } + + for (int i = 0; i < cnt; i++) { + byte[] value = storage.getData(String.valueOf(i)); + assert value != null; + assert value.length == 3; + } - stopGrid(0); + stopGrid(0); - ignite0 = startGrid(0); + ignite0 = startGrid(0); - ignite0.cluster().active(true); + ignite0.cluster().active(true); - sharedCtx0 = ignite0.context().cache().context(); + sharedCtx0 = ignite0.context().cache().context(); - storage = sharedCtx0.database().metaStorage(); + storage = sharedCtx0.database().metaStorage(); - assert storage != null; + assert storage != null; - for (int i = 0; i < cnt; i++) { - byte[] value = storage.getData(String.valueOf(i)); - assert value != null; - } - } - finally { - stopAllGrids(); + for (int i = 0; i < cnt; i++) { + byte[] value = storage.getData(String.valueOf(i)); + assert value != null; } } @@ -1301,203 +1234,193 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception if failed. */ public void testAbsentDeadlock_Iterator_RollOver_Archivation() throws Exception { - try { - walSegments = 2; + walSegments = 2; - walSegmentSize = 512 * 1024; + walSegmentSize = 512 * 1024; - IgniteEx ignite0 = (IgniteEx)startGrid("node0"); + IgniteEx ignite0 = (IgniteEx)startGrid("node0"); - ignite0.active(true); + ignite0.active(true); - IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME); + IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME); - for (int i = 0; i < 100; i++) - cache0.put(i, new IndexedObject(i)); + for (int i = 0; i < 100; i++) + cache0.put(i, new IndexedObject(i)); - GridCacheSharedContext<Object, Object> sharedCtx = ignite0.context().cache().context(); + GridCacheSharedContext<Object, Object> sharedCtx = ignite0.context().cache().context(); - GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database(); + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database(); - db.waitForCheckpoint("test"); - db.enableCheckpoints(false).get(); + db.waitForCheckpoint("test"); + db.enableCheckpoints(false).get(); - // Log something to know where to start. - WALPointer ptr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis())); + // Log something to know where to start. + WALPointer ptr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis())); - info("Replay marker: " + ptr); + info("Replay marker: " + ptr); - for (int i = 100; i < 200; i++) - cache0.put(i, new IndexedObject(i)); + for (int i = 100; i < 200; i++) + cache0.put(i, new IndexedObject(i)); - CountDownLatch insertFinished = new CountDownLatch(1); - GridTestUtils.runAsync( - () -> { - try (WALIterator it = sharedCtx.wal().replay(ptr)) { - if (it.hasNext()) { - it.next(); + CountDownLatch insertFinished = new CountDownLatch(1); + GridTestUtils.runAsync( + () -> { + try (WALIterator it = sharedCtx.wal().replay(ptr)) { + if (it.hasNext()) { + it.next(); - insertFinished.await(); - } + insertFinished.await(); } - - return null; } - ); - IgniteInternalFuture<Object> future = GridTestUtils.runAsync( - () -> { - for (int i = 0; i < 10000; i++) - cache0.put(i, new IndexedObject(i)); + return null; + } + ); - return null; - } - ); + IgniteInternalFuture<Object> future = GridTestUtils.runAsync( + () -> { + for (int i = 0; i < 10000; i++) + cache0.put(i, new IndexedObject(i)); - future.get(); + return null; + } + ); - insertFinished.countDown(); + future.get(); - ignite0.close(); - } - finally { - stopAllGrids(); - } + insertFinished.countDown(); + + ignite0.close(); } /** * @throws Exception if failed. */ public void testApplyDeltaRecords() throws Exception { - try { - IgniteEx ignite0 = (IgniteEx)startGrid("node0"); + IgniteEx ignite0 = (IgniteEx)startGrid("node0"); - ignite0.cluster().active(true); + ignite0.cluster().active(true); - IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME); + IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME); - for (int i = 0; i < 1000; i++) - cache0.put(i, new IndexedObject(i)); + for (int i = 0; i < 1000; i++) + cache0.put(i, new IndexedObject(i)); - GridCacheSharedContext<Object, Object> sharedCtx = ignite0.context().cache().context(); + GridCacheSharedContext<Object, Object> sharedCtx = ignite0.context().cache().context(); - GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database(); + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database(); - db.waitForCheckpoint("test"); - db.enableCheckpoints(false).get(); + db.waitForCheckpoint("test"); + db.enableCheckpoints(false).get(); - // Log something to know where to start. - WALPointer ptr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis())); + // Log something to know where to start. + WALPointer ptr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis())); - info("Replay marker: " + ptr); + info("Replay marker: " + ptr); - for (int i = 1000; i < 5000; i++) - cache0.put(i, new IndexedObject(i)); + for (int i = 1000; i < 5000; i++) + cache0.put(i, new IndexedObject(i)); - info("Done puts..."); + info("Done puts..."); - for (int i = 2_000; i < 3_000; i++) - cache0.remove(i); + for (int i = 2_000; i < 3_000; i++) + cache0.remove(i); - info("Done removes..."); + info("Done removes..."); - for (int i = 5000; i < 6000; i++) - cache0.put(i, new IndexedObject(i)); + for (int i = 5000; i < 6000; i++) + cache0.put(i, new IndexedObject(i)); - info("Done puts..."); + info("Done puts..."); - Map<FullPageId, byte[]> rolledPages = new HashMap<>(); + Map<FullPageId, byte[]> rolledPages = new HashMap<>(); - int pageSize = sharedCtx.database().pageSize(); + int pageSize = sharedCtx.database().pageSize(); - ByteBuffer buf = ByteBuffer.allocateDirect(pageSize); + ByteBuffer buf = ByteBuffer.allocateDirect(pageSize); - // Now check that deltas can be correctly applied. - try (WALIterator it = sharedCtx.wal().replay(ptr)) { - while (it.hasNext()) { - IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); + // Now check that deltas can be correctly applied. + try (WALIterator it = sharedCtx.wal().replay(ptr)) { + while (it.hasNext()) { + IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); - WALRecord rec = tup.get2(); + WALRecord rec = tup.get2(); - if (rec instanceof PageSnapshot) { - PageSnapshot page = (PageSnapshot)rec; + if (rec instanceof PageSnapshot) { + PageSnapshot page = (PageSnapshot)rec; - rolledPages.put(page.fullPageId(), page.pageData()); - } - else if (rec instanceof PageDeltaRecord) { - PageDeltaRecord delta = (PageDeltaRecord)rec; + rolledPages.put(page.fullPageId(), page.pageData()); + } + else if (rec instanceof PageDeltaRecord) { + PageDeltaRecord delta = (PageDeltaRecord)rec; - FullPageId fullId = new FullPageId(delta.pageId(), delta.groupId()); + FullPageId fullId = new FullPageId(delta.pageId(), delta.groupId()); - byte[] pageData = rolledPages.get(fullId); + byte[] pageData = rolledPages.get(fullId); - if (pageData == null) { - pageData = new byte[pageSize]; + if (pageData == null) { + pageData = new byte[pageSize]; - rolledPages.put(fullId, pageData); - } + rolledPages.put(fullId, pageData); + } - assertNotNull("Missing page snapshot [page=" + fullId + ", delta=" + delta + ']', pageData); + assertNotNull("Missing page snapshot [page=" + fullId + ", delta=" + delta + ']', pageData); - buf.order(ByteOrder.nativeOrder()); + buf.order(ByteOrder.nativeOrder()); - buf.position(0); - buf.put(pageData); - buf.position(0); + buf.position(0); + buf.put(pageData); + buf.position(0); - delta.applyDelta(sharedCtx.database().dataRegion(null).pageMemory(), - GridUnsafe.bufferAddress(buf)); + delta.applyDelta(sharedCtx.database().dataRegion(null).pageMemory(), + GridUnsafe.bufferAddress(buf)); - buf.position(0); + buf.position(0); - buf.get(pageData); - } + buf.get(pageData); } } + } - info("Done apply..."); + info("Done apply..."); - PageMemoryEx pageMem = (PageMemoryEx)db.dataRegion(null).pageMemory(); + PageMemoryEx pageMem = (PageMemoryEx)db.dataRegion(null).pageMemory(); - for (Map.Entry<FullPageId, byte[]> entry : rolledPages.entrySet()) { - FullPageId fullId = entry.getKey(); + for (Map.Entry<FullPageId, byte[]> entry : rolledPages.entrySet()) { + FullPageId fullId = entry.getKey(); - ignite0.context().cache().context().database().checkpointReadLock(); + ignite0.context().cache().context().database().checkpointReadLock(); + + try { + long page = pageMem.acquirePage(fullId.groupId(), fullId.pageId(), true); try { - long page = pageMem.acquirePage(fullId.groupId(), fullId.pageId(), true); + long bufPtr = pageMem.writeLock(fullId.groupId(), fullId.pageId(), page, true); try { - long bufPtr = pageMem.writeLock(fullId.groupId(), fullId.pageId(), page, true); + byte[] data = entry.getValue(); - try { - byte[] data = entry.getValue(); + for (int i = 0; i < data.length; i++) { + if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize())) + continue; // Skip tracking pages. - for (int i = 0; i < data.length; i++) { - if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize())) - continue; // Skip tracking pages. - - assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(bufPtr, i), data[i]); - } - } - finally { - pageMem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, false, true); + assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(bufPtr, i), data[i]); } } finally { - pageMem.releasePage(fullId.groupId(), fullId.pageId(), page); + pageMem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, false, true); } } finally { - ignite0.context().cache().context().database().checkpointReadUnlock(); + pageMem.releasePage(fullId.groupId(), fullId.pageId(), page); } } - - ignite0.close(); - } - finally { - stopAllGrids(); + finally { + ignite0.context().cache().context().database().checkpointReadUnlock(); + } } + + ignite0.close(); } /** @@ -1506,73 +1429,68 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception If fail. */ public void testRecoveryOnTransactionalAndPartitionedCache() throws Exception { - IgniteEx ignite = (IgniteEx) startGrids(3); + IgniteEx ignite = (IgniteEx)startGrids(3); ignite.cluster().active(true); - try { - final String cacheName = "transactional"; + final String cacheName = "transactional"; - CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>(cacheName) - .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) - .setAffinity(new RendezvousAffinityFunction(false, 32)) - .setCacheMode(CacheMode.PARTITIONED) - .setRebalanceMode(CacheRebalanceMode.SYNC) - .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) - .setBackups(2); + CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>(cacheName) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setCacheMode(CacheMode.PARTITIONED) + .setRebalanceMode(CacheRebalanceMode.SYNC) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setBackups(2); - ignite.createCache(cacheConfiguration); + ignite.createCache(cacheConfiguration); - IgniteCache<Object, Object> cache = ignite.cache(cacheName); - Map<Object, Object> map = new HashMap<>(); + IgniteCache<Object, Object> cache = ignite.cache(cacheName); + Map<Object, Object> map = new HashMap<>(); - final int transactions = 100; - final int operationsPerTransaction = 40; + final int transactions = 100; + final int operationsPerTransaction = 40; - Random random = new Random(); + Random random = new Random(); - for (int t = 1; t <= transactions; t++) { - Transaction tx = ignite.transactions().txStart( - TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); + for (int t = 1; t <= transactions; t++) { + Transaction tx = ignite.transactions().txStart( + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); - Map<Object, Object> changesInTransaction = new HashMap<>(); - - for (int op = 0; op < operationsPerTransaction; op++) { - int key = random.nextInt(1000) + 1; + Map<Object, Object> changesInTransaction = new HashMap<>(); - Object value = random.nextBoolean() ? randomString(random) + key : new BigObject(key); + for (int op = 0; op < operationsPerTransaction; op++) { + int key = random.nextInt(1000) + 1; - changesInTransaction.put(key, value); + Object value = random.nextBoolean() ? randomString(random) + key : new BigObject(key); - cache.put(key, value); - } + changesInTransaction.put(key, value); - if (random.nextBoolean()) { - tx.commit(); - map.putAll(changesInTransaction); - } - else { - tx.rollback(); - } + cache.put(key, value); + } - if (t % 50 == 0) - log.info("Finished transaction " + t); + if (random.nextBoolean()) { + tx.commit(); + map.putAll(changesInTransaction); + } + else { + tx.rollback(); } - stopAllGrids(); + if (t % 50 == 0) + log.info("Finished transaction " + t); + } - ignite = (IgniteEx) startGrids(3); - ignite.cluster().active(true); + stopAllGrids(); - cache = ignite.cache(cacheName); + ignite = (IgniteEx)startGrids(3); + ignite.cluster().active(true); - for (Object key : map.keySet()) { - Object expectedValue = map.get(key); - Object actualValue = cache.get(key); - Assert.assertEquals("Unexpected value for key " + key, expectedValue, actualValue); - } - } - finally { - stopAllGrids(); + cache = ignite.cache(cacheName); + + for (Object key : map.keySet()) { + Object expectedValue = map.get(key); + Object actualValue = cache.get(key); + Assert.assertEquals("Unexpected value for key " + key, expectedValue, actualValue); } } @@ -1584,7 +1502,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { public void testTxRecordsConsistency() throws Exception { System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, "true"); - IgniteEx ignite = (IgniteEx) startGrids(3); + IgniteEx ignite = (IgniteEx)startGrids(3); ignite.cluster().active(true); try { @@ -1688,7 +1606,6 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { } finally { System.clearProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS); - stopAllGrids(); } } @@ -1801,7 +1718,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); IgniteCache<Object, Object> locCache = ignite.cache(LOC_CACHE_NAME); - for (int i = 0; i < 10_000; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { cache.put(i, new IndexedObject(i)); locCache.put(i, new IndexedObject(i)); } @@ -1888,7 +1805,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); IgniteCache<Object, Object> locCache = ignite.cache(LOC_CACHE_NAME); - for (int i = 0; i < 10_000; i++) { + for (int i = 0; i < ENTRY_COUNT; i++) { { Object val = cache.get(i); @@ -1959,7 +1876,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); IgniteCache<Object, Object> locCache = ignite.cache(LOC_CACHE_NAME); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < LARGE_ENTRY_COUNT; i++) { final long[] data = new long[LARGE_ARR_SIZE]; Arrays.fill(data, i); @@ -2053,7 +1970,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < LARGE_ENTRY_COUNT; i++) { final long[] data = new long[LARGE_ARR_SIZE]; Arrays.fill(data, i);
