IGNITE-6434 Fixed error in checkpointer during topology change. Fixes #2718
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21de1c56 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21de1c56 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21de1c56 Branch: refs/heads/ignite-2.1.5-p1 Commit: 21de1c56268c18685dd3620b7e3dc776ca2cf532 Parents: ae9c6d6 Author: Eduard Shangareev <[email protected]> Authored: Fri Sep 22 16:17:42 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Sep 22 16:18:36 2017 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 495 ++++++++++--------- .../GridCacheDatabaseSharedManager.java | 5 +- .../IgnitePdsExchangeDuringCheckpointTest.java | 135 +++++ .../IgniteCacheDataStructuresSelfTestSuite.java | 4 +- .../ignite/testsuites/IgnitePdsTestSuite2.java | 4 + 5 files changed, 407 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index cad21d3..5a1e050 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -298,20 +298,27 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException { - U.writeLock(lock); + ctx.database().checkpointReadLock(); try { - if (stopping) - return; + U.writeLock(lock); - long updateSeq = this.updateSeq.incrementAndGet(); + try { + if (stopping) + return; - initPartitions0(affVer, exchFut, updateSeq); + long updateSeq = this.updateSeq.incrementAndGet(); - consistencyCheck(); + initPartitions0(affVer, exchFut, updateSeq); + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } } finally { - lock.writeLock().unlock(); + ctx.database().checkpointReadUnlock(); } } @@ -589,101 +596,109 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ", affVer=" + grp.affinity().lastVersion() + ", fut=" + exchFut + ']'; - lock.writeLock().lock(); + ctx.database().checkpointReadLock(); try { - if (stopping) - return false; - assert readyTopVer.initialized() : readyTopVer; - assert lastTopChangeVer.equals(readyTopVer); + lock.writeLock().lock(); - if (log.isDebugEnabled()) - log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" + - fullMapString() + ']'); + try { + if (stopping) + return false; - long updateSeq = this.updateSeq.incrementAndGet(); + assert readyTopVer.initialized() : readyTopVer; + assert lastTopChangeVer.equals(readyTopVer); - for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false, false); + if (log.isDebugEnabled()) + log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" + + fullMapString() + ']'); - if (partitionLocalNode(p, topVer)) { - // This partition will be created during next topology event, - // which obviously has not happened at this point. - if (locPart == null) { - if (log.isDebugEnabled()) - log.debug("Skipping local partition afterExchange (will not create): " + p); + long updateSeq = this.updateSeq.incrementAndGet(); - continue; - } + for (int p = 0; p < num; p++) { + GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false, false); - GridDhtPartitionState state = locPart.state(); + if (partitionLocalNode(p, topVer)) { + // This partition will be created during next topology event, + // which obviously has not happened at this point. + if (locPart == null) { + if (log.isDebugEnabled()) + log.debug("Skipping local partition afterExchange (will not create): " + p); - if (state == MOVING) { - if (grp.rebalanceEnabled()) { - Collection<ClusterNode> owners = owners(p); + continue; + } - // If there are no other owners, then become an owner. - if (F.isEmpty(owners)) { - boolean owned = locPart.own(); + GridDhtPartitionState state = locPart.state(); - assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName() + ", locPart=" + - locPart + ']'; + if (state == MOVING) { + if (grp.rebalanceEnabled()) { + Collection<ClusterNode> owners = owners(p); - updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); + // If there are no other owners, then become an owner. + if (F.isEmpty(owners)) { + boolean owned = locPart.own(); - changed = true; + assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName() + ", locPart=" + + locPart + ']'; - if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - DiscoveryEvent discoEvt = exchFut.events().lastEvent(); + updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); - grp.addRebalanceEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, - discoEvt.eventNode(), - discoEvt.type(), - discoEvt.timestamp()); - } + changed = true; - if (log.isDebugEnabled()) - log.debug("Owned partition: " + locPart); + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + DiscoveryEvent discoEvt = exchFut.events().lastEvent(); + + grp.addRebalanceEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); + } + + if (log.isDebugEnabled()) + log.debug("Owned partition: " + locPart); + } + else if (log.isDebugEnabled()) + log.debug("Will not own partition (there are owners to rebalance from) [locPart=" + + locPart + ", owners = " + owners + ']'); } - else if (log.isDebugEnabled()) - log.debug("Will not own partition (there are owners to rebalance from) [locPart=" + - locPart + ", owners = " + owners + ']'); + else + updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); } - else - updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); } - } - else { - if (locPart != null) { - GridDhtPartitionState state = locPart.state(); + else { + if (locPart != null) { + GridDhtPartitionState state = locPart.state(); - if (state == MOVING) { - locPart.rent(false); + if (state == MOVING) { + locPart.rent(false); - updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); + updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); - changed = true; + changed = true; - if (log.isDebugEnabled()) - log.debug("Evicting moving partition (it does not belong to affinity): " + locPart); + if (log.isDebugEnabled()) + log.debug("Evicting moving partition (it does not belong to affinity): " + locPart); + } } } } - } - AffinityAssignment aff = grp.affinity().readyAffinity(topVer); + AffinityAssignment aff = grp.affinity().readyAffinity(topVer); - if (node2part != null && node2part.valid()) - changed |= checkEvictions(updateSeq, aff); + if (node2part != null && node2part.valid()) + changed |= checkEvictions(updateSeq, aff); - updateRebalanceVersion(aff.assignment()); + updateRebalanceVersion(aff.assignment()); - consistencyCheck(); + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } } finally { - lock.writeLock().unlock(); + ctx.database().checkpointReadUnlock(); } return changed; @@ -709,6 +724,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private GridDhtLocalPartition createPartition(int p) { assert lock.isWriteLockedByCurrentThread(); + assert ctx.database().checkpointLockIsHeldByThread(); + GridDhtLocalPartition loc = locParts.get(p); if (loc == null || loc.state() == EVICTED) { @@ -1183,232 +1200,239 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert partMap != null; - lock.writeLock().lock(); + ctx.database().checkpointReadLock(); try { - if (stopping || !lastTopChangeVer.initialized() || - // Ignore message not-related to exchange if exchange is in progress. - (exchangeVer == null && !lastTopChangeVer.equals(readyTopVer))) - return false; + lock.writeLock().lock(); - if (incomeCntrMap != null) { - // update local counters in partitions - for (int i = 0; i < locParts.length(); i++) { - GridDhtLocalPartition part = locParts.get(i); + try { + if (stopping || !lastTopChangeVer.initialized() || + // Ignore message not-related to exchange if exchange is in progress. + (exchangeVer == null && !lastTopChangeVer.equals(readyTopVer))) + return false; - if (part == null) - continue; + if (incomeCntrMap != null) { + // update local counters in partitions + for (int i = 0; i < locParts.length(); i++) { + GridDhtLocalPartition part = locParts.get(i); + + if (part == null) + continue; - if (part.state() == OWNING || part.state() == MOVING) { - long updCntr = incomeCntrMap.updateCounter(part.id()); + if (part.state() == OWNING || part.state() == MOVING) { + long updCntr = incomeCntrMap.updateCounter(part.id()); - if (updCntr != 0 && updCntr > part.updateCounter()) - part.updateCounter(updCntr); + if (updCntr != 0 && updCntr > part.updateCounter()) + part.updateCounter(updCntr); + } } } - } - if (exchangeVer != null) { - // Ignore if exchange already finished or new exchange started. - if (readyTopVer.compareTo(exchangeVer) > 0 || lastTopChangeVer.compareTo(exchangeVer) > 0) { - U.warn(log, "Stale exchange id for full partition map update (will ignore) [" + + if (exchangeVer != null) { + // Ignore if exchange already finished or new exchange started. + if (readyTopVer.compareTo(exchangeVer) > 0 || lastTopChangeVer.compareTo(exchangeVer) > 0) { + U.warn(log, "Stale exchange id for full partition map update (will ignore) [" + + "lastTopChange=" + lastTopChangeVer + + ", readTopVer=" + readyTopVer + + ", exchVer=" + exchangeVer + ']'); + + return false; + } + } + + if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) { + U.warn(log, "Stale version for full partition map update message (will ignore) [" + "lastTopChange=" + lastTopChangeVer + ", readTopVer=" + readyTopVer + - ", exchVer=" + exchangeVer + ']'); + ", msgVer=" + msgTopVer + ']'); return false; } - } - - if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) { - U.warn(log, "Stale version for full partition map update message (will ignore) [" + - "lastTopChange=" + lastTopChangeVer + - ", readTopVer=" + readyTopVer + - ", msgVer=" + msgTopVer + ']'); - return false; - } + boolean fullMapUpdated = (node2part == null); - boolean fullMapUpdated = (node2part == null); + if (node2part != null) { + for (GridDhtPartitionMap part : node2part.values()) { + GridDhtPartitionMap newPart = partMap.get(part.nodeId()); - if (node2part != null) { - for (GridDhtPartitionMap part : node2part.values()) { - GridDhtPartitionMap newPart = partMap.get(part.nodeId()); + if (shouldOverridePartitionMap(part, newPart)) { + fullMapUpdated = true; - if (shouldOverridePartitionMap(part, newPart)) { - fullMapUpdated = true; + if (log.isDebugEnabled()) { + log.debug("Overriding partition map in full update map [exchVer=" + exchangeVer + + ", curPart=" + mapString(part) + + ", newPart=" + mapString(newPart) + ']'); + } - if (log.isDebugEnabled()) { - log.debug("Overriding partition map in full update map [exchVer=" + exchangeVer + - ", curPart=" + mapString(part) + - ", newPart=" + mapString(newPart) + ']'); + if (newPart.nodeId().equals(ctx.localNodeId())) + updateSeq.setIfGreater(newPart.updateSequence()); + } + else { + // If for some nodes current partition has a newer map, + // then we keep the newer value. + partMap.put(part.nodeId(), part); } - - if (newPart.nodeId().equals(ctx.localNodeId())) - updateSeq.setIfGreater(newPart.updateSequence()); - } - else { - // If for some nodes current partition has a newer map, - // then we keep the newer value. - partMap.put(part.nodeId(), part); } - } - // Check that we have new nodes. - for (GridDhtPartitionMap part : partMap.values()) { - if (fullMapUpdated) - break; + // Check that we have new nodes. + for (GridDhtPartitionMap part : partMap.values()) { + if (fullMapUpdated) + break; - fullMapUpdated = !node2part.containsKey(part.nodeId()); - } + fullMapUpdated = !node2part.containsKey(part.nodeId()); + } - // Remove entry if node left. - for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { - UUID nodeId = it.next(); + // Remove entry if node left. + for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { + UUID nodeId = it.next(); - if (!ctx.discovery().alive(nodeId)) { - if (log.isDebugEnabled()) - log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" + - partMap + ']'); + if (!ctx.discovery().alive(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" + + partMap + ']'); - it.remove(); + it.remove(); + } } } - } - else { - GridDhtPartitionMap locNodeMap = partMap.get(ctx.localNodeId()); - - if (locNodeMap != null) - updateSeq.setIfGreater(locNodeMap.updateSequence()); - } + else { + GridDhtPartitionMap locNodeMap = partMap.get(ctx.localNodeId()); - if (!fullMapUpdated) { - if (log.isDebugEnabled()) { - log.debug("No updates for full partition map (will ignore) [lastExch=" + lastTopChangeVer + - ", exchVer=" + exchangeVer + - ", curMap=" + node2part + - ", newMap=" + partMap + ']'); + if (locNodeMap != null) + updateSeq.setIfGreater(locNodeMap.updateSequence()); } - return false; - } + if (!fullMapUpdated) { + if (log.isDebugEnabled()) { + log.debug("No updates for full partition map (will ignore) [lastExch=" + lastTopChangeVer + + ", exchVer=" + exchangeVer + + ", curMap=" + node2part + + ", newMap=" + partMap + ']'); + } - if (exchangeVer != null) { - assert exchangeVer.compareTo(readyTopVer) >= 0 && exchangeVer.compareTo(lastTopChangeVer) >= 0; + return false; + } - lastTopChangeVer = readyTopVer = exchangeVer; - } + if (exchangeVer != null) { + assert exchangeVer.compareTo(readyTopVer) >= 0 && exchangeVer.compareTo(lastTopChangeVer) >= 0; - node2part = partMap; + lastTopChangeVer = readyTopVer = exchangeVer; + } - if (exchangeVer == null && !grp.isReplicated() && - (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) { - AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); + node2part = partMap; - for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { - for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { - int p = e0.getKey(); + if (exchangeVer == null && !grp.isReplicated() && + (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) { + AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); - Set<UUID> diffIds = diffFromAffinity.get(p); + for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { + int p = e0.getKey(); - if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) && - !affAssignment.getIds(p).contains(e.getKey())) { + Set<UUID> diffIds = diffFromAffinity.get(p); - if (diffIds == null) - diffFromAffinity.put(p, diffIds = U.newHashSet(3)); + if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) && + !affAssignment.getIds(p).contains(e.getKey())) { - diffIds.add(e.getKey()); - } - else { - if (diffIds != null && diffIds.remove(e.getKey())) { - if (diffIds.isEmpty()) - diffFromAffinity.remove(p); + if (diffIds == null) + diffFromAffinity.put(p, diffIds = U.newHashSet(3)); + + diffIds.add(e.getKey()); + } + else { + if (diffIds != null && diffIds.remove(e.getKey())) { + if (diffIds.isEmpty()) + diffFromAffinity.remove(p); + } } } } - } - diffFromAffinityVer = readyTopVer; - } + diffFromAffinityVer = readyTopVer; + } - boolean changed = false; + boolean changed = false; - GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId()); + GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId()); - if (nodeMap != null && ctx.database().persistenceEnabled() && readyTopVer.initialized()) { - for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) { - int p = e.getKey(); - GridDhtPartitionState state = e.getValue(); + if (nodeMap != null && ctx.database().persistenceEnabled() && readyTopVer.initialized()) { + for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) { + int p = e.getKey(); + GridDhtPartitionState state = e.getValue(); - if (state == OWNING) { - GridDhtLocalPartition locPart = locParts.get(p); + if (state == OWNING) { + GridDhtLocalPartition locPart = locParts.get(p); - assert locPart != null : grp.cacheOrGroupName(); + assert locPart != null : grp.cacheOrGroupName(); - if (locPart.state() == MOVING) { - boolean success = locPart.own(); + if (locPart.state() == MOVING) { + boolean success = locPart.own(); - assert success : locPart; + assert success : locPart; - changed |= success; + changed |= success; + } } - } - else if (state == MOVING) { - GridDhtLocalPartition locPart = locParts.get(p); + else if (state == MOVING) { + GridDhtLocalPartition locPart = locParts.get(p); - if (locPart == null || locPart.state() == EVICTED) - locPart = createPartition(p); + if (locPart == null || locPart.state() == EVICTED) + locPart = createPartition(p); - if (locPart.state() == OWNING) { - locPart.moving(); + if (locPart.state() == OWNING) { + locPart.moving(); - changed = true; + changed = true; + } } - } - else if (state == RENTING && partsToReload.contains(p)) { - GridDhtLocalPartition locPart = locParts.get(p); + else if (state == RENTING && partsToReload.contains(p)) { + GridDhtLocalPartition locPart = locParts.get(p); - if (locPart == null || locPart.state() == EVICTED) { - createPartition(p); + if (locPart == null || locPart.state() == EVICTED) { + createPartition(p); - changed = true; - } - else if (locPart.state() == OWNING || locPart.state() == MOVING) { - locPart.reload(true); + changed = true; + } + else if (locPart.state() == OWNING || locPart.state() == MOVING) { + locPart.reload(true); - locPart.rent(false); + locPart.rent(false); - changed = true; + changed = true; + } + else + locPart.reload(true); } - else - locPart.reload(true); } } - } - long updateSeq = this.updateSeq.incrementAndGet(); + long updateSeq = this.updateSeq.incrementAndGet(); - if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { - AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); + if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { + AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); - if (exchangeVer == null) - changed |= checkEvictions(updateSeq, aff); + if (exchangeVer == null) + changed |= checkEvictions(updateSeq, aff); - updateRebalanceVersion(aff.assignment()); - } + updateRebalanceVersion(aff.assignment()); + } - consistencyCheck(); + consistencyCheck(); - if (log.isDebugEnabled()) - log.debug("Partition map after full update: " + fullMapString()); + if (log.isDebugEnabled()) + log.debug("Partition map after full update: " + fullMapString()); - if (changed) - ctx.exchange().scheduleResendPartitions(); + if (changed) + ctx.exchange().scheduleResendPartitions(); - return changed; + return changed; + } + finally { + lock.writeLock().unlock(); + } } finally { - lock.writeLock().unlock(); + ctx.database().checkpointReadUnlock(); } } @@ -2188,27 +2212,34 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) { - lock.writeLock().lock(); + ctx.database().checkpointReadLock(); try { - if (stopping) - return; + lock.writeLock().lock(); - assert part.state() == EVICTED; + try { + if (stopping) + return; - long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); + assert part.state() == EVICTED; - if (part.reload()) - part = createPartition(part.id()); + long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); - assert lastTopChangeVer.initialized() : lastTopChangeVer; + if (part.reload()) + part = createPartition(part.id()); - updateLocal(part.id(), part.state(), seq, lastTopChangeVer); + assert lastTopChangeVer.initialized() : lastTopChangeVer; - consistencyCheck(); + updateLocal(part.id(), part.state(), seq, lastTopChangeVer); + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } } finally { - lock.writeLock().unlock(); + ctx.database().checkpointReadUnlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 1b5dae6..85e3baa 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -2281,6 +2281,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan for (DbCheckpointListener lsnr : lsnrs) lsnr.onCheckpointBegin(ctx0); + if (curr.nextSnapshot) + snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -2300,8 +2303,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cpRec.addCacheGroupState(grp.groupId(), state); } - if (curr.nextSnapshot) - snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map); cpPagesTuple = beginAllCheckpoints(); http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java new file mode 100644 index 0000000..3969fb6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class IgnitePdsExchangeDuringCheckpointTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + public void testExchangeOnNodeLeft() throws Exception { + for (int i = 0; i < 5; i++) { + startGrids(3); + IgniteEx ignite = grid(1); + ignite.active(true); + + awaitPartitionMapExchange(); + + stopGrid(0, true); + + awaitPartitionMapExchange(); + + ignite.context().cache().context().database().wakeupForCheckpoint("test").get(10000); + + afterTest(); + } + } + + /** + * + */ + public void testExchangeOnNodeJoin() throws Exception { + for (int i = 0; i < 5; i++) { + startGrids(2); + IgniteEx ignite = grid(1); + ignite.active(true); + + awaitPartitionMapExchange(); + + IgniteEx ex = startGrid(2); + + awaitPartitionMapExchange(); + + ex.context().cache().context().database().wakeupForCheckpoint("test").get(10000); + + afterTest(); + } + } + + /** + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + MemoryConfiguration memCfg = new MemoryConfiguration(); + + MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration(); + + memPlcCfg.setName("dfltMemPlc"); + memPlcCfg.setInitialSize(100 * 1024 * 1024); + memPlcCfg.setMaxSize(1000 * 1024 * 1024); + + memCfg.setDefaultMemoryPolicyName("dfltMemPlc"); + memCfg.setMemoryPolicies(memPlcCfg); + + cfg.setMemoryConfiguration(memCfg); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setAffinity(new RendezvousAffinityFunction(false, 4096)); + + cfg.setCacheConfiguration(ccfg); + + PersistentStoreConfiguration psiCfg = new PersistentStoreConfiguration() + .setCheckpointingThreads(1) + .setCheckpointingFrequency(1) + .setWalMode(WALMode.LOG_ONLY); + + cfg.setPersistentStoreConfiguration(psiCfg); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java index 568af94..6e16d2e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDa import org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDiscoveryDataStructuresTest; import org.apache.ignite.internal.processors.cache.datastructures.IgniteDataStructureUniqueNameTest; import org.apache.ignite.internal.processors.cache.datastructures.IgniteDataStructureWithJobTest; +import org.apache.ignite.internal.processors.cache.datastructures.SemaphoreFailoverSafeReleasePermitsTest; import org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalAtomicQueueApiSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalAtomicSetSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalQueueApiSelfTest; @@ -129,8 +130,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(IgnitePartitionedCountDownLatchSelfTest.class)); suite.addTest(new TestSuite(IgniteDataStructureWithJobTest.class)); suite.addTest(new TestSuite(IgnitePartitionedSemaphoreSelfTest.class)); - // TODO https://issues.apache.org/jira/browse/IGNITE-4173, enable when fixed. - // suite.addTest(new TestSuite(SemaphoreFailoverSafeReleasePermitsTest.class)); + suite.addTest(new TestSuite(SemaphoreFailoverSafeReleasePermitsTest.class)); // TODO IGNITE-3141, enabled when fixed. // suite.addTest(new TestSuite(IgnitePartitionedLockSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index ab8ff81..29ea64c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes; +import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchangeDuringCheckpointTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistenceMetricsSelfTest; @@ -73,6 +74,9 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteWalFlushFailoverTest.class); suite.addTestSuite(IgniteWalReaderTest.class); + + suite.addTestSuite(IgnitePdsExchangeDuringCheckpointTest.class); + return suite; } }
