Repository: ignite Updated Branches: refs/heads/ignite-6643 ca877b434 -> 38a488313 (forced update)
IGNITE-7500 Set clear flag on last supply message - Fixes #3421. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fb5b6134 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fb5b6134 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fb5b6134 Branch: refs/heads/ignite-6643 Commit: fb5b61347e74b7ee4d50e71d92470897446cefe2 Parents: 5216ac5 Author: Pavel Kovalenko <[email protected]> Authored: Tue Jan 23 17:30:41 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Jan 23 17:33:52 2018 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionSupplier.java | 10 +- ...idCacheRebalancingPartitionCountersTest.java | 178 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite3.java | 2 + 3 files changed, 185 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fb5b6134/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 6eb31ed..7194b24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -30,9 +30,9 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.T3; @@ -303,11 +303,8 @@ class GridDhtPartitionSupplier { iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.isHistorical(part) ? d.partitionCounter(part) : null); - if (!iter.historical()) { + if (!iter.historical()) assert !grp.persistenceEnabled() || !d.isHistorical(part); - - s.clean(part); - } else assert grp.persistenceEnabled() && d.isHistorical(part); } @@ -416,6 +413,9 @@ class GridDhtPartitionSupplier { // Mark as last supply message. s.last(part, loc.updateCounter()); + if (!d.isHistorical(part)) + s.clean(part); + phase = SupplyContextPhase.NEW; sctx = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/fb5b6134/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java new file mode 100644 index 0000000..0676c45 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionCountersTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class GridCacheRebalancingPartitionCountersTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + private static final int PARTITIONS_CNT = 10; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setConsistentId(igniteInstanceName) + .setDataStorageConfiguration( + new DataStorageConfiguration() + .setCheckpointFrequency(3_000) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(100L * 1024 * 1024))) + .setCacheConfiguration(new CacheConfiguration(CACHE_NAME) + .setBackups(2) + .setRebalanceBatchSize(4096) // Force to create several supply messages during rebalancing. + .setAffinity( + new RendezvousAffinityFunction(false, PARTITIONS_CNT))); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } + + /** + * + */ + private boolean contains(int[] arr, int a) { + for (int i : arr) + if (i == a) + return true; + + return false; + } + + /** + * Tests that after rebalancing all partition update counters have the same value on all nodes. + */ + public void test() throws Exception { + IgniteEx ignite = (IgniteEx)startGrids(3); + + ignite.active(true); + + IgniteCache cache = ignite.cache(CACHE_NAME); + + for (int i = 0; i < 256; i++) + cache.put(i, i); + + final int problemNode = 2; + + IgniteEx node = (IgniteEx) ignite(problemNode); + int[] primaryPartitions = node.affinity(CACHE_NAME).primaryPartitions(node.cluster().localNode()); + + ignite.active(false); + + boolean primaryRemoved = false; + for (int i = 0; i < PARTITIONS_CNT; i++) { + String nodeName = getTestIgniteInstanceName(problemNode); + + Path dirPath = Paths.get(U.defaultWorkDirectory(), "db", nodeName.replace(".", "_"), CACHE_NAME + "-" + CACHE_NAME); + + info("Path: " + dirPath.toString()); + + assertTrue(Files.exists(dirPath)); + + for (File f : dirPath.toFile().listFiles()) { + if (f.getName().equals("part-" + i + ".bin")) { + if (contains(primaryPartitions, i)) { + info("Removing: " + f.getName()); + + primaryRemoved = true; + + f.delete(); + } + } + else if (f.getName().equals("index.bin")) { + info("Removing: " + f.getName()); + + f.delete(); + } + } + } + + assertTrue(primaryRemoved); + + ignite.active(true); + waitForRebalancing(); + + List<String> issues = new ArrayList<>(); + HashMap<Integer, Long> partMap = new HashMap<>(); + + for (int i = 0; i < 3; i++) + checkUpdCounter((IgniteEx)ignite(i), issues, partMap); + + for (String issue : issues) + error(issue); + + assertTrue(issues.isEmpty()); + } + + /** + * + */ + private void checkUpdCounter(IgniteEx ignite, List<String> issues, HashMap<Integer, Long> partMap) { + final CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(CU.cacheId(CACHE_NAME)); + + assertNotNull(grpCtx); + + GridDhtPartitionTopologyImpl top = (GridDhtPartitionTopologyImpl)grpCtx.topology(); + + List<GridDhtLocalPartition> locParts = top.localPartitions(); + + for (GridDhtLocalPartition part : locParts) { + Long cnt = partMap.get(part.id()); + + if (cnt == null) + partMap.put(part.id(), part.updateCounter()); + + if ((cnt != null && part.updateCounter() != cnt) || part.updateCounter() == 0) + issues.add("Node name " + ignite.name() + "Part = " + part.id() + " updCounter " + part.updateCounter()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fb5b6134/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index a6be07e..674b6a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePut import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest; @@ -146,6 +147,7 @@ public class IgniteCacheTestSuite3 extends TestSuite { suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class); suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class); suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class); + suite.addTestSuite(GridCacheRebalancingPartitionCountersTest.class); // Test for byte array value special case. suite.addTestSuite(GridCacheLocalByteArrayValuesSelfTest.class);
