Repository: ignite Updated Branches: refs/heads/ignite-5578 95a41e885 -> 25520bf97
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/25520bf9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/25520bf9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/25520bf9 Branch: refs/heads/ignite-5578 Commit: 25520bf97f03218529bb8ae39ba9510453567793 Parents: 95a41e8 Author: sboikov <[email protected]> Authored: Fri Aug 4 10:43:55 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Aug 4 11:39:30 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 22 +++-- .../processors/cache/GridCacheIoManager.java | 18 ++++ .../dht/GridClientPartitionTopology.java | 6 +- .../dht/GridDhtPartitionTopologyImpl.java | 99 +++++++++----------- .../GridDhtPartitionsExchangeFuture.java | 10 +- .../datastreamer/DataStreamerImpl.java | 9 +- .../distributed/CacheExchangeMergeTest.java | 9 +- 7 files changed, 99 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index d02df5c..b99f054 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -186,9 +186,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) { - assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0; + synchronized (mux) { + assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0; - lastAffVer = topVer; + lastAffVer = topVer; + } } } @@ -266,7 +268,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheAffinityChangeMessage msg = null; synchronized (mux) { - if (waitInfo == null) + if (waitInfo == null || !waitInfo.topVer.equals(lastAffVer) ) return; Map<Integer, UUID> partWait = waitInfo.waitGrps.get(checkGrpId); @@ -305,14 +307,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } } - } - try { - if (msg != null) - cctx.discovery().sendCustomEvent(msg); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send affinity change message.", e); + try { + if (msg != null) + cctx.discovery().sendCustomEvent(msg); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send affinity change message.", e); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 91872e8..6529795 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -480,6 +480,24 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * @return Lock or {@code null} if node is stopping. + */ + @Nullable public Lock readLock() { + Lock lock = rw.readLock(); + + if (!lock.tryLock()) + return null; + + if (stopping) { + lock.unlock(); + + return null; + } + + return lock; + } + + /** * */ public void writeLock() { http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 12f3fb3..c934df0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -313,7 +313,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); if (exchFut.context().events().hasServerLeft()) { - for (DiscoveryEvent evt : exchFut.context().events().events()) { + List<DiscoveryEvent> evts0 = exchFut.context().events().events(); + + for (int i = 0; i < evts0.size(); i++) { + DiscoveryEvent evt = evts0.get(i); + if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) removeNode(evt.eventNode().id()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 0f804d9..3ecb443a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -109,12 +109,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** */ private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE; - /** */ - private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE; - - /** */ + /** Last started exchange version (always >= readyTopVer). */ private volatile AffinityTopologyVersion lastTopChangeVer = AffinityTopologyVersion.NONE; + /** Last finished exchange version. */ + private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE; + /** Discovery cache. */ private volatile DiscoCache discoCache; @@ -261,8 +261,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return topVer; } - @Override - public AffinityTopologyVersion lastTopologyChangeVersion() { + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion lastTopologyChangeVersion() { AffinityTopologyVersion topVer = this.lastTopChangeVer; assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + @@ -322,9 +322,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert grp.affinity().lastVersion().equals(affVer) : "Invalid affinity [topVer=" + grp.affinity().lastVersion() + - ", grp=" + grp.cacheOrGroupName() + - ", affVer=" + affVer + - ", fut=" + exchFut + ']'; + ", grp=" + grp.cacheOrGroupName() + + ", affVer=" + affVer + + ", fut=" + exchFut + ']'; int num = grp.affinity().partitions(); @@ -505,8 +505,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (evts.hasServerLeft()) { - for (DiscoveryEvent evt : evts.events()) { - if (evts.serverLeftEvent(evt)) + List<DiscoveryEvent> evts0 = evts.events(); + + for (int i = 0; i < evts0.size(); i++) { + DiscoveryEvent evt = evts0.get(i); + + if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) removeNode(evt.eventNode().id()); } } @@ -581,7 +585,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return false; - assert readyTopVer.topologyVersion() > 0 : readyTopVer; + assert readyTopVer.initialized() : readyTopVer; assert lastTopChangeVer.equals(readyTopVer); if (log.isDebugEnabled()) @@ -659,12 +663,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - List<List<ClusterNode>> aff = grp.affinity().readyAssignments(topVer); + AffinityAssignment aff = grp.affinity().readyAffinity(topVer); if (node2part != null && node2part.valid()) - changed |= checkEvictions(updateSeq, topVer, aff); + changed |= checkEvictions(updateSeq, aff); - updateRebalanceVersion(aff); + updateRebalanceVersion(aff.assignment()); consistencyCheck(); } @@ -1156,7 +1160,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().lock(); try { - if (stopping || lastTopChangeVer == null) + if (stopping || !lastTopChangeVer.initialized()) return false; if (incomeCntrMap != null) { @@ -1185,13 +1189,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (exchangeVer != null) { // Ignore if exchange already finished or new exchange started. if (readyTopVer.compareTo(exchangeVer) > 0 || lastTopChangeVer.compareTo(exchangeVer) > 0) { - if (log.isDebugEnabled()) { - log.debug("Stale exchange id for full partition map update (will ignore) [" + - "lastTopChange=" + lastTopChangeVer + - ", readTopVer=" + readyTopVer + - ", exchVer=" + exchangeVer + ']'); - } - U.warn(log, "Stale exchange id for full partition map update (will ignore) [" + "lastTopChange=" + lastTopChangeVer + ", readTopVer=" + readyTopVer + @@ -1202,13 +1199,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) { - if (log.isDebugEnabled()) { - log.debug("Stale version for full partition map update message (will ignore) [" + - "lastTopChange=" + lastTopChangeVer + - ", readTopVer=" + readyTopVer + - ", msgVer=" + msgTopVer + ']'); - } - U.warn(log, "Stale version for full partition map update message (will ignore) [" + "lastTopChange=" + lastTopChangeVer + ", readTopVer=" + readyTopVer + @@ -1382,12 +1372,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { - List<List<ClusterNode>> aff = grp.affinity().readyAssignments(readyTopVer); + AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); if (exchangeVer == null) - changed |= checkEvictions(updateSeq, readyTopVer, aff); + changed |= checkEvictions(updateSeq, aff); - updateRebalanceVersion(aff); + updateRebalanceVersion(aff.assignment()); } consistencyCheck(); @@ -1490,14 +1480,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return false; if (!force) { - if (lastTopChangeVer != null && exchId != null && lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) { - if (log.isDebugEnabled()) { - log.debug("Stale exchange id for single partition map update (will ignore) [" + - "lastTopChange=" + lastTopChangeVer + - ", readTopVer=" + readyTopVer + - ", exch=" + exchId.topologyVersion() + ']'); - } - + if (lastTopChangeVer.initialized() && exchId != null && lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) { U.warn(log, "Stale exchange id for single partition map update (will ignore) [" + "lastTopChange=" + lastTopChangeVer + ", readTopVer=" + readyTopVer + @@ -1518,11 +1501,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); } else if (isStaleUpdate(cur, parts)) { - if (log.isDebugEnabled()) { - log.debug("Stale update for single partition map update (will ignore) [exchId=" + exchId + - ", curMap=" + cur + ", newMap=" + parts + ']'); - } - U.warn(log, "Stale update for single partition map update (will ignore) [exchId=" + exchId + ", curMap=" + cur + ", newMap=" + parts + ']'); @@ -1541,7 +1519,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { node2part.put(parts.nodeId(), parts); - // During exchange calculate diff after all messages are received and affinity initialized. + // During exchange diff is calculated after all messages are received and affinity initialized. if (exchId == null && !grp.isReplicated()) { if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) { AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); @@ -1589,12 +1567,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { - List<List<ClusterNode>> aff = grp.affinity().assignments(readyTopVer); + AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); if (exchId == null) - changed |= checkEvictions(updateSeq, readyTopVer, aff); + changed |= checkEvictions(updateSeq, aff); - updateRebalanceVersion(aff); + updateRebalanceVersion(aff.assignment()); } consistencyCheck(); @@ -1635,6 +1613,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + /** + * @param aff Affinity. + */ private void createMovingPartitions(AffinityAssignment aff) { for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { GridDhtPartitionMap map = e.getValue(); @@ -1644,6 +1625,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + /** + * @param map Node partition state map. + * @param parts Partitions assigned to node. + */ private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) { if (F.isEmpty(parts)) return; @@ -1813,7 +1798,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - checkEvictions(updSeq, resTopVer, grp.affinity().readyAssignments(resTopVer)); + checkEvictions(updSeq, grp.affinity().readyAffinity(resTopVer)); grp.needsRecovery(false); } @@ -1914,7 +1899,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param aff Affinity assignments. * @return Checks if any of the local partitions need to be evicted. */ - private boolean checkEvictions(long updateSeq, AffinityTopologyVersion affVer, List<List<ClusterNode>> aff) { + private boolean checkEvictions(long updateSeq, AffinityAssignment aff) { boolean changed = false; UUID locId = ctx.localNodeId(); @@ -1931,7 +1916,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { List<ClusterNode> affNodes = aff.get(p); if (!affNodes.contains(ctx.localNode())) { - List<ClusterNode> nodes = nodes(p, affVer, OWNING, null); + List<ClusterNode> nodes = nodes(p, aff.topologyVersion(), OWNING, null); Collection<UUID> nodeIds = F.nodeIds(nodes); // If all affinity nodes are owners, then evict partition from local node. @@ -1940,7 +1925,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { part.rent(false); - updateSeq = updateLocal(part.id(), part.state(), updateSeq, affVer); + updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion()); changed = true; @@ -1965,7 +1950,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { part.rent(false); - updateSeq = updateLocal(part.id(), part.state(), updateSeq, affVer); + updateSeq = updateLocal(part.id(), + part.state(), + updateSeq, + aff.topologyVersion()); changed = true; @@ -1991,6 +1979,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param p Partition. * @param state State. * @param updateSeq Update sequence. + * @param affVer Affinity version. * @return Update sequence. */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 546b17b..67e41b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -2999,14 +3000,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte newCrdFut.listen(new CI1<IgniteInternalFuture>() { @Override public void apply(IgniteInternalFuture fut) { - if (isDone() || !enterBusy()) + if (isDone()) + return; + + Lock lock = cctx.io().readLock(); + + if (lock == null) return; try { onBecomeCoordinator((InitNewCoordinatorFuture) fut); } finally { - leaveBusy(); + lock.unlock(); } } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index afaa81d..e2ee0b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -768,9 +768,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } try { - AffinityTopologyVersion topVer = allowOverwrite() || cctx.isLocal() ? - ctx.cache().context().exchange().readyAffinityVersion() : - cctx.topology().readyTopologyVersion(); + AffinityTopologyVersion topVer; + + if (!cctx.isLocal()) + topVer = ctx.cache().context().exchange().lastTopologyFuture().get(); + else + topVer = ctx.cache().context().exchange().readyAffinityVersion(); for (DataStreamerEntry entry : entries) { List<ClusterNode> nodes; http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index d9d7e08..3149385 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -1315,7 +1315,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) { for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { for (TransactionIsolation isolation : TransactionIsolation.values()) - checkNodeCaches(node, cache, concurrency, isolation); + checkNodeCaches(err, node, cache, concurrency, isolation); } } } @@ -1327,12 +1327,15 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } /** + * @param err Error message. * @param node Node. * @param cache Cache. * @param concurrency Transaction concurrency. * @param isolation Transaction isolation. */ - private void checkNodeCaches(Ignite node, + private void checkNodeCaches( + String err, + Ignite node, IgniteCache<Object, Object> cache, TransactionConcurrency concurrency, TransactionIsolation isolation) { @@ -1362,7 +1365,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } for (Map.Entry<Object, Object> e : map.entrySet()) - assertEquals(e.getValue(), cache.get(e.getKey())); + assertEquals(err, e.getValue(), cache.get(e.getKey())); } /**
