IGNITE-7278 Fixed partition state recovery from WAL
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/000b71ef Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/000b71ef Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/000b71ef Branch: refs/heads/ignite-6644 Commit: 000b71efd795031f8f3aed114e2ce662c7195987 Parents: fb5b613 Author: Dmitriy Govorukhin <[email protected]> Authored: Tue Jan 23 18:30:04 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Jan 23 18:30:04 2018 +0300 ---------------------------------------------------------------------- .../GridCacheDatabaseSharedManager.java | 26 +- .../IgnitePdsContinuousRestartTest2.java | 291 +++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 2 + 3 files changed, 305 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/000b71ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 85c4005..205365b 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1916,26 +1916,24 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; case PARTITION_DESTROY: - if (apply) { - PartitionDestroyRecord destroyRec = (PartitionDestroyRecord)rec; + PartitionDestroyRecord destroyRec = (PartitionDestroyRecord)rec; - final int gId = destroyRec.groupId(); + final int gId = destroyRec.groupId(); - if (storeOnly && gId != METASTORAGE_CACHE_ID) - continue; + if (storeOnly && gId != METASTORAGE_CACHE_ID) + continue; - if (!ignoreGrps.contains(gId)) { - final int pId = destroyRec.partitionId(); + if (!ignoreGrps.contains(gId)) { + final int pId = destroyRec.partitionId(); - PageMemoryEx pageMem = gId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(gId); + PageMemoryEx pageMem = gId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(gId); - pageMem.clearAsync(new P3<Integer, Long, Integer>() { - @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) { - return cacheId == gId && PageIdUtils.partId(pageId) == pId; - } - }, true).get(); + pageMem.clearAsync(new P3<Integer, Long, Integer>() { + @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) { + return cacheId == gId && PageIdUtils.partId(pageId) == pId; + } + }, true).get(); - } } break; http://git-wip-us.apache.org/repos/asf/ignite/blob/000b71ef/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java new file mode 100644 index 0000000..f45fc50 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; + +/** + * Cause by https://issues.apache.org/jira/browse/IGNITE-7278 + */ +public class IgnitePdsContinuousRestartTest2 extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 4; + + /** */ + private static final int ENTRIES_COUNT = 10_000; + + /** */ + public static final String CACHE_NAME = "cache1"; + + /** Checkpoint delay. */ + private volatile int checkpointDelay = -1; + + /** */ + private boolean cancel; + + /** + * Default constructor. + */ + public IgnitePdsContinuousRestartTest2() { + + } + + /** + * @param cancel Cancel. + */ + public IgnitePdsContinuousRestartTest2(boolean cancel) { + this.cancel = cancel; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + DataStorageConfiguration memCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(400 * 1024 * 1024) + .setPersistenceEnabled(true)) + .setWalMode(WALMode.LOG_ONLY) + .setCheckpointFrequency(checkpointDelay); + + cfg.setDataStorageConfiguration(memCfg); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 128)); + ccfg.setBackups(2); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + stopAllGrids(); + + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + deleteWorkFiles(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void deleteWorkFiles() throws IgniteCheckedException { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_1000_500_1_1() throws Exception { + checkRebalancingDuringLoad(1000, 500, 1, 1); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_8000_500_1_1() throws Exception { + checkRebalancingDuringLoad(8000, 500, 1, 1); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_1000_20000_1_1() throws Exception { + checkRebalancingDuringLoad(1000, 20000, 1, 1); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_8000_8000_1_1() throws Exception { + checkRebalancingDuringLoad(8000, 8000, 1, 1); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_1000_500_8_1() throws Exception { + checkRebalancingDuringLoad(1000, 500, 8, 1); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_8000_500_8_1() throws Exception { + checkRebalancingDuringLoad(8000, 500, 8, 1); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_1000_20000_8_1() throws Exception { + checkRebalancingDuringLoad(1000, 20000, 8, 1); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_8000_8000_8_1() throws Exception { + checkRebalancingDuringLoad(8000, 8000, 8, 1); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_1000_500_8_16() throws Exception { + checkRebalancingDuringLoad(1000, 500, 8, 16); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_8000_500_8_16() throws Exception { + checkRebalancingDuringLoad(8000, 500, 8, 16); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_1000_20000_8_16() throws Exception { + checkRebalancingDuringLoad(1000, 20000, 8, 16); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingDuringLoad_8000_8000_8_16() throws Exception { + checkRebalancingDuringLoad(8000, 8000, 8, 16); + } + + /** + * + * @throws Exception if failed. + */ + public void testRebalncingDuringLoad_10_10_1_1() throws Exception { + checkRebalancingDuringLoad(10, 10, 1, 1); + } + + /** + * + * @throws Exception if failed. + */ + public void testRebalncingDuringLoad_10_500_8_16() throws Exception { + checkRebalancingDuringLoad(10, 500, 8, 16); + } + + /** + * @throws Exception if failed. + */ + private void checkRebalancingDuringLoad( + int restartDelay, + int checkpointDelay, + int threads, + final int batch + ) throws Exception { + this.checkpointDelay = checkpointDelay; + + startGrids(GRID_CNT); + + final Ignite load = ignite(0); + + load.cluster().active(true); + + try (IgniteDataStreamer<Object, Object> s = load.dataStreamer(CACHE_NAME)) { + s.allowOverwrite(true); + + for (int i = 0; i < ENTRIES_COUNT; i++) + s.addData(i, i); + } + + final AtomicBoolean done = new AtomicBoolean(false); + + IgniteInternalFuture<?> busyFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + IgniteCache<Object, Object> cache = load.cache(CACHE_NAME); + Random rnd = ThreadLocalRandom.current(); + + while (!done.get()) { + Map<Integer, Integer> map = new TreeMap<>(); + + for (int i = 0; i < batch; i++) + map.put(rnd.nextInt(ENTRIES_COUNT), rnd.nextInt()); + + cache.putAll(map); + } + + return null; + } + }, threads, "updater"); + + long end = System.currentTimeMillis() + 90_000; + + Random rnd = ThreadLocalRandom.current(); + + while (System.currentTimeMillis() < end) { + int idx = rnd.nextInt(GRID_CNT - 1) + 1; + + stopGrid(idx, cancel); + + U.sleep(restartDelay); + + startGrid(idx); + + U.sleep(restartDelay); + } + + done.set(true); + + busyFut.get(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/000b71ef/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 3852a16..a3dc5a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest; +import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest2; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchangeDuringCheckpointTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest; @@ -92,6 +93,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteWalHistoryReservationsTest.class); suite.addTestSuite(IgnitePdsContinuousRestartTest.class); + suite.addTestSuite(IgnitePdsContinuousRestartTest2.class); suite.addTestSuite(IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes.class);
