Cleanup DiscoCache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/788adc0d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/788adc0d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/788adc0d Branch: refs/heads/ignite-5896 Commit: 788adc0d00e0455e06cc9acabfe9ad425fdcd65b Parents: 1202f3f Author: sboikov <[email protected]> Authored: Wed Sep 6 11:14:21 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 6 11:14:21 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 62 +-- .../discovery/GridDiscoveryManager.java | 38 +- .../cache/CacheAffinitySharedManager.java | 17 +- .../GridCachePartitionExchangeManager.java | 9 +- .../processors/cache/GridCacheUtils.java | 53 +-- .../dht/GridClientPartitionTopology.java | 11 +- .../dht/GridDhtPartitionTopologyImpl.java | 6 +- .../cache/distributed/dht/GridDhtTxLocal.java | 48 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 440 +++++++++---------- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 12 +- .../near/GridNearTransactionalCache.java | 4 +- .../cache/distributed/near/GridNearTxLocal.java | 20 +- .../cache/query/GridCacheQueryAdapter.java | 2 +- .../cache/query/GridCacheQueryManager.java | 6 +- .../cache/transactions/IgniteTxHandler.java | 15 +- .../service/GridServiceProcessor.java | 4 +- .../CacheLateAffinityAssignmentTest.java | 2 +- 19 files changed, 302 insertions(+), 453 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 5ac99f1..4b57eb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -58,13 +58,6 @@ public class DiscoCache { /** Daemon nodes. */ private final List<ClusterNode> daemonNodes; - /** All server nodes. */ - private final List<ClusterNode> srvNodesWithCaches; - - /** All nodes with at least one cache configured. */ - @GridToStringInclude - private final List<ClusterNode> allNodesWithCaches; - /** All remote nodes with at least one cache configured. */ @GridToStringInclude private final List<ClusterNode> rmtNodesWithCaches; @@ -97,8 +90,6 @@ public class DiscoCache { * @param allNodes All nodes. * @param srvNodes Server nodes. * @param daemonNodes Daemon nodes. - * @param srvNodesWithCaches Server nodes with at least one cache configured. - * @param allNodesWithCaches All nodes with at least one cache configured. * @param rmtNodesWithCaches Remote nodes with at least one cache configured. * @param allCacheNodes Cache nodes by cache name. * @param cacheGrpAffNodes Affinity nodes by cache group ID. @@ -113,8 +104,6 @@ public class DiscoCache { List<ClusterNode> allNodes, List<ClusterNode> srvNodes, List<ClusterNode> daemonNodes, - List<ClusterNode> srvNodesWithCaches, - List<ClusterNode> allNodesWithCaches, List<ClusterNode> rmtNodesWithCaches, Map<Integer, List<ClusterNode>> allCacheNodes, Map<Integer, List<ClusterNode>> cacheGrpAffNodes, @@ -127,8 +116,6 @@ public class DiscoCache { this.allNodes = allNodes; this.srvNodes = srvNodes; this.daemonNodes = daemonNodes; - this.srvNodesWithCaches = srvNodesWithCaches; - this.allNodesWithCaches = allNodesWithCaches; this.rmtNodesWithCaches = rmtNodesWithCaches; this.allCacheNodes = allCacheNodes; this.cacheGrpAffNodes = cacheGrpAffNodes; @@ -195,36 +182,13 @@ public class DiscoCache { return daemonNodes; } - /** @return Server nodes with at least one cache configured. */ - public List<ClusterNode> serverNodesWithCaches() { - return srvNodesWithCaches; - } - /** - * Gets all remote nodes that have at least one cache configured. + * Gets all alive remote nodes that have at least one cache configured. * * @return Collection of nodes. */ - public List<ClusterNode> remoteNodesWithCaches() { - return rmtNodesWithCaches; - } - - /** - * Gets collection of nodes with at least one cache configured. - * - * @return Collection of nodes. - */ - public List<ClusterNode> allNodesWithCaches() { - return allNodesWithCaches; - } - - /** - * Gets collection of server nodes with at least one cache configured. - * - * @return Collection of nodes. - */ - public Collection<ClusterNode> aliveServerNodes() { - return F.view(serverNodes(), new P1<ClusterNode>() { + public Collection<ClusterNode> remoteAliveNodesWithCaches() { + return F.view(rmtNodesWithCaches, new P1<ClusterNode>() { @Override public boolean apply(ClusterNode node) { return alives.contains(node.id()); } @@ -236,8 +200,8 @@ public class DiscoCache { * * @return Collection of nodes. */ - public Collection<ClusterNode> aliveServerNodesWithCaches() { - return F.view(serverNodesWithCaches(), new P1<ClusterNode>() { + public Collection<ClusterNode> aliveServerNodes() { + return F.view(serverNodes(), new P1<ClusterNode>() { @Override public boolean apply(ClusterNode node) { return alives.contains(node.id()); } @@ -248,16 +212,14 @@ public class DiscoCache { * @return Oldest alive server node. */ public @Nullable ClusterNode oldestAliveServerNode(){ - Iterator<ClusterNode> it = aliveServerNodes().iterator(); - return it.hasNext() ? it.next() : null; - } + for (int i = 0; i < srvNodes.size(); i++) { + ClusterNode srv = srvNodes.get(i); - /** - * @return Oldest alive server node with at least one cache configured. - */ - public @Nullable ClusterNode oldestAliveServerNodeWithCache(){ - Iterator<ClusterNode> it = aliveServerNodesWithCaches().iterator(); - return it.hasNext() ? it.next() : null; + if (alives.contains(srv.id())) + return srv; + } + + return null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 56af9bf..fa5d053 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1845,42 +1845,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets cache nodes for cache with given ID. - * - * @param cacheId Cache ID. - * @param topVer Topology version. - * @return Collection of cache nodes. - */ - public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId); - } - - /** - * Gets all nodes with at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of cache nodes. - */ - public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches(); - } - - /** * Gets cache remote nodes for cache with given name. * * @param topVer Topology version. * @return Collection of cache nodes. */ - public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(CU.cacheId(null), topVer).remoteNodesWithCaches(); + public Collection<ClusterNode> remoteAliveNodesWithCaches(AffinityTopologyVersion topVer) { + return resolveDiscoCache(CU.cacheId(null), topVer).remoteAliveNodesWithCaches(); } /** * @param topVer Topology version (maximum allowed node order). * @return Oldest alive server nodes with at least one cache configured. */ - @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) { - return resolveDiscoCache(CU.cacheId(null), topVer).oldestAliveServerNodeWithCache(); + @Nullable public ClusterNode oldestAliveServerNode(AffinityTopologyVersion topVer) { + return resolveDiscoCache(CU.cacheId(null), topVer).oldestAliveServerNode(); } /** @@ -2243,9 +2222,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size()); - Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); - Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); for (ClusterNode node : allNodes) { assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; @@ -2270,11 +2247,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { CachePredicate filter = entry.getValue(); if (filter.cacheNode(node)) { - allNodesWithCaches.add(node); - - if(!CU.clientNode(node)) - srvNodesWithCaches.add(node); - if (!node.isLocal()) rmtNodesWithCaches.add(node); @@ -2291,8 +2263,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Collections.unmodifiableList(allNodes), Collections.unmodifiableList(srvNodes), Collections.unmodifiableList(daemonNodes), - U.sealList(srvNodesWithCaches), - U.sealList(allNodesWithCaches), U.sealList(rmtNodesWithCaches), Collections.unmodifiableMap(allCacheNodes), Collections.unmodifiableMap(cacheGrpAffNodes), http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 39a5ea8..beb7a7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1659,7 +1659,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (!aff.lastVersion().equals(topVer)) calculateAndInit(fut.events(), aff, topVer); - grpHolder.topology().beforeExchange(fut, true, false); + grpHolder.topology(fut.context().events().discoveryCache()).beforeExchange(fut, true, false); } else { List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures(); @@ -1715,7 +1715,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (!aff.lastVersion().equals(topVer)) calculateAndInit(fut.events(), aff, topVer); - grpHolder.topology().beforeExchange(fut, true, false); + grpHolder.topology(fut.context().events().discoveryCache()).beforeExchange(fut, true, false); } } @@ -1833,7 +1833,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Map<UUID, GridDhtPartitionMap> map = affinityFullMap(aff); for (GridDhtPartitionMap map0 : map.values()) - cache.topology().update(fut.exchangeId(), map0, true); + cache.topology(fut.context().events().discoveryCache()).update(fut.exchangeId(), map0, true); } } }); @@ -2059,7 +2059,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<List<ClusterNode>> newAssignment0 = initAff ? new ArrayList<>(newAssignment) : null; - GridDhtPartitionTopology top = grpHolder.topology(); + GridDhtPartitionTopology top = grpHolder.topology(fut.context().events().discoveryCache()); Map<Integer, List<T>> cacheAssignment = null; @@ -2277,9 +2277,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param discoCache Discovery data cache. * @return Cache topology. */ - abstract GridDhtPartitionTopology topology(); + abstract GridDhtPartitionTopology topology(DiscoCache discoCache); /** * @return Affinity. @@ -2314,7 +2315,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** {@inheritDoc} */ - @Override public GridDhtPartitionTopology topology() { + @Override public GridDhtPartitionTopology topology(DiscoCache discoCache) { return grp.topology(); } } @@ -2390,8 +2391,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** {@inheritDoc} */ - @Override public GridDhtPartitionTopology topology() { - return cctx.exchange().clientTopology(groupId()); + @Override public GridDhtPartitionTopology topology(DiscoCache discoCache) { + return cctx.exchange().clientTopology(groupId(), discoCache); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index bd34a5f..fe9ed29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -719,9 +719,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param grpId Cache group ID. + * @param discoCache Discovery data cache. * @return Topology. */ - public GridDhtPartitionTopology clientTopology(int grpId) { + public GridDhtPartitionTopology clientTopology(int grpId, DiscoCache discoCache) { GridClientPartitionTopology top = clientTops.get(grpId); if (top != null) @@ -741,7 +742,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana aff.partitions()); GridClientPartitionTopology old = clientTops.putIfAbsent(grpId, - top = new GridClientPartitionTopology(cctx, grpId, aff.partitions(), affKey)); + top = new GridClientPartitionTopology(cctx, discoCache, grpId, aff.partitions(), affKey)); return old != null ? old : top; } @@ -925,7 +926,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send */ private void refreshPartitions() { - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); + ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE); if (oldest == null) { if (log.isDebugEnabled()) @@ -955,7 +956,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana AffinityTopologyVersion rmtTopVer = lastFut != null ? (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion()) : AffinityTopologyVersion.NONE; - Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer); + Collection<ClusterNode> rmts = cctx.discovery().remoteAliveNodesWithCaches(rmtTopVer); if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 070fc81..2018a64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -441,59 +441,14 @@ public class GridCacheUtils { } /** - * Gets all nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return All nodes on which cache with the same name is started (including nodes - * that may have already left). - */ - public static Collection<ClusterNode> allNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().cacheNodes(ctx.cacheId(), topOrder); - } - - /** - * Gets all nodes with at least one cache configured. - * - * @param ctx Shared cache context. - * @param topOrder Maximum allowed node order. - * @return All nodes on which cache with the same name is started (including nodes - * that may have already left). - */ - public static Collection<ClusterNode> allNodes(GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().cacheNodes(topOrder); - } - - /** - * Gets remote nodes with at least one cache configured. - * - * @param ctx Cache shared context. - * @param topVer Topology version. - * @return Collection of remote nodes with at least one cache configured. - */ - public static Collection<ClusterNode> remoteNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topVer) { - return ctx.discovery().remoteCacheNodes(topVer); - } - - /** - * Gets all nodes on which cache with the same name is started and the local DHT storage is enabled. - * - * @param ctx Cache context. - * @return All nodes on which cache with the same name is started. - */ - public static Collection<ClusterNode> affinityNodes(final GridCacheContext ctx) { - return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), AffinityTopologyVersion.NONE); - } - - /** * Gets DHT affinity nodes. * * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. + * @param topVer Topology version. + * @return Cache affinity nodes for given topology version. */ - public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), topOrder); + public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topVer) { + return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), topVer); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 299394f..e994113 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -120,17 +120,20 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** * @param cctx Context. + * @param discoCache Discovery data cache. * @param grpId Group ID. * @param parts Number of partitions in the group. * @param similarAffKey Key to find caches with similar affinity. */ public GridClientPartitionTopology( GridCacheSharedContext<?, ?> cctx, + DiscoCache discoCache, int grpId, int parts, Object similarAffKey ) { this.cctx = cctx; + this.discoCache = discoCache; this.grpId = grpId; this.similarAffKey = similarAffKey; this.parts = parts; @@ -338,7 +341,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + ClusterNode oldest = discoCache.oldestAliveServerNode(); assert oldest != null; @@ -535,7 +538,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { * @return List of nodes for the partition. */ private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.allNodesWithCaches()) : null; + Collection<UUID> allIds = F.nodeIds(discoCache.cacheGroupAffinityNodes(grpId)); lock.readLock().lock(); @@ -961,10 +964,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + ClusterNode oldest = discoCache.oldestAliveServerNode(); // If this node became the oldest node. - if (oldest.id().equals(cctx.localNodeId())) { + if (cctx.localNode().equals(oldest)) { long seq = node2part.updateSequence(); if (seq != updateSeq) { http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index f7f71a1..01d972d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -326,7 +326,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (grp.affinityNode()) { ClusterNode loc = ctx.localNode(); - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + ClusterNode oldest = discoCache.oldestAliveServerNode(); GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); @@ -466,7 +466,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lastTopChangeVer = readyTopVer = evts.topologyVersion(); } - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + ClusterNode oldest = discoCache.oldestAliveServerNode(); if (log.isDebugEnabled()) { log.debug("Partition map beforeExchange [exchId=" + exchFut.exchangeId() + @@ -2047,7 +2047,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private long updateLocal(int p, GridDhtPartitionState state, long updateSeq, AffinityTopologyVersion affVer) { assert lock.isWriteLockedByCurrentThread(); - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + ClusterNode oldest = discoCache.oldestAliveServerNode(); assert oldest != null || ctx.kernalContext().clientNode(); http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 5b8a7b5..ab5631e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -34,10 +34,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; @@ -313,24 +313,10 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** * Prepares next batch of entries in dht transaction. * - * @param reads Read entries. - * @param writes Write entries. - * @param verMap Version map. - * @param msgId Message ID. - * @param nearMiniId Near mini future ID. - * @param txNodes Transaction nodes mapping. - * @param last {@code True} if this is last prepare request. + * @param req Prepare request. * @return Future that will be completed when locks are acquired. */ - public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync( - @Nullable Collection<IgniteTxEntry> reads, - @Nullable Collection<IgniteTxEntry> writes, - Map<IgniteTxKey, GridCacheVersion> verMap, - long msgId, - int nearMiniId, - Map<UUID, Collection<UUID>> txNodes, - boolean last - ) { + public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(GridNearTxPrepareRequest req) { // In optimistic mode prepare still can be called explicitly from salvageTx. GridDhtTxPrepareFuture fut = prepFut; @@ -344,14 +330,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa cctx, this, timeout, - nearMiniId, - verMap, - last, + req.miniId(), + req.dhtVersions(), + req.last(), needReturnValue()))) { GridDhtTxPrepareFuture f = prepFut; - assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " + - "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; + assert f.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " + + "[futMiniId=" + f.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + f + ']'; if (timeout == -1) f.onError(timeoutException()); @@ -360,8 +346,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } } else { - assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " + - "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']'; + assert fut.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " + + "[futMiniId=" + fut.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + fut + ']'; // Prepare was called explicitly. return chainOnePhasePrepare(fut); @@ -389,14 +375,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } try { - if (reads != null) { - for (IgniteTxEntry e : reads) - addEntry(msgId, e); + if (req.reads() != null) { + for (IgniteTxEntry e : req.reads()) + addEntry(req.messageId(), e); } - if (writes != null) { - for (IgniteTxEntry e : writes) - addEntry(msgId, e); + if (req.writes() != null) { + for (IgniteTxEntry e : req.writes()) + addEntry(req.messageId(), e); } userPrepare(null); @@ -407,7 +393,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (isSystemInvalidate()) fut.complete(); else - fut.prepare(reads, writes, txNodes); + fut.prepare(req); } catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) { fut.onError(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index ce3c290..1c5e1a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -168,14 +169,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite @SuppressWarnings("UnusedDeclaration") private volatile int mapped; - /** Prepare reads. */ - private Iterable<IgniteTxEntry> reads; - - /** Prepare writes. */ - private Iterable<IgniteTxEntry> writes; - - /** Tx nodes. */ - private Map<UUID, Collection<UUID>> txNodes; + /** Prepare request. */ + private GridNearTxPrepareRequest req; /** Trackable flag. */ private boolean trackable = true; @@ -341,7 +336,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite private void onEntriesLocked() { ret = new GridCacheReturn(null, tx.localResult(), true, null, true); - for (IgniteTxEntry writeEntry : writes) { + for (IgniteTxEntry writeEntry : req.writes()) { IgniteTxEntry txEntry = tx.entry(writeEntry.txKey()); assert txEntry != null : writeEntry; @@ -597,10 +592,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite if (log.isDebugEnabled()) log.debug("Marking all local candidates as ready: " + this); - readyLocks(writes); + readyLocks(req.writes()); if (tx.serializable() && tx.optimistic()) - readyLocks(reads); + readyLocks(req.reads()); locksReady = true; } @@ -896,8 +891,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite */ private void addDhtValues(GridNearTxPrepareResponse res) { // Interceptor on near node needs old values to execute callbacks. - if (!F.isEmpty(writes)) { - for (IgniteTxEntry e : writes) { + if (!F.isEmpty(req.writes())) { + for (IgniteTxEntry e : req.writes()) { IgniteTxEntry txEntry = tx.entry(e.txKey()); assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']'; @@ -1002,33 +997,30 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite /** * Initializes future. * - * @param reads Read entries. - * @param writes Write entries. - * @param txNodes Transaction nodes mapping. + * @param req Prepare request. */ @SuppressWarnings("TypeMayBeWeakened") - public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes, - Map<UUID, Collection<UUID>> txNodes) { + public void prepare(GridNearTxPrepareRequest req) { + assert req != null; + if (tx.empty()) { tx.setRollbackOnly(); onDone((GridNearTxPrepareResponse)null); } - this.reads = reads; - this.writes = writes; - this.txNodes = txNodes; + this.req = req; boolean ser = tx.serializable() && tx.optimistic(); - if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) { + if (!F.isEmpty(req.writes()) || (ser && !F.isEmpty(req.reads()))) { Map<Integer, Collection<KeyCacheObject>> forceKeys = null; - for (IgniteTxEntry entry : writes) + for (IgniteTxEntry entry : req.writes()) forceKeys = checkNeedRebalanceKeys(entry, forceKeys); if (ser) { - for (IgniteTxEntry entry : reads) + for (IgniteTxEntry entry : req.reads()) forceKeys = checkNeedRebalanceKeys(entry, forceKeys); } @@ -1196,10 +1188,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite IgniteCheckedException err0; try { - err0 = checkReadConflict(writes); + err0 = checkReadConflict(req.writes()); if (err0 == null) - err0 = checkReadConflict(reads); + err0 = checkReadConflict(req.reads()); } catch (IgniteCheckedException e) { U.error(log, "Failed to check entry version: " + e, e); @@ -1230,258 +1222,262 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite // We are holding transaction-level locks for entries here, so we can get next write version. tx.writeVersion(cctx.versions().next(tx.topologyVersion())); - { - // Assign keys to primary nodes. - if (!F.isEmpty(writes)) { - for (IgniteTxEntry write : writes) - map(tx.entry(write.txKey())); - } + // Assign keys to primary nodes. + if (!F.isEmpty(req.writes())) { + for (IgniteTxEntry write : req.writes()) + map(tx.entry(write.txKey())); + } - if (!F.isEmpty(reads)) { - for (IgniteTxEntry read : reads) - map(tx.entry(read.txKey())); - } + if (!F.isEmpty(req.reads())) { + for (IgniteTxEntry read : req.reads()) + map(tx.entry(read.txKey())); } if (isDone()) return; - if (last) { - if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) { - for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { - if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { - tx.onePhaseCommit(false); + if (last) + sendPrepareRequests(); + } + finally { + markInitialized(); + } + } - break; - } - } - } + /** + * + */ + private void sendPrepareRequests() { + if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) { + for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { + if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { + tx.onePhaseCommit(false); - int miniId = 0; + break; + } + } + } - assert tx.transactionNodes() != null; + int miniId = 0; - final long timeout = timeoutObj != null ? timeoutObj.timeout : 0; + assert tx.transactionNodes() != null; - // Create mini futures. - for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { - assert !dhtMapping.empty(); + final long timeout = timeoutObj != null ? timeoutObj.timeout : 0; - ClusterNode n = dhtMapping.primary(); + // Create mini futures. + for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { + assert !dhtMapping.empty(); - assert !n.isLocal(); + ClusterNode n = dhtMapping.primary(); - GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id()); + assert !n.isLocal(); - Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes(); + GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id()); - Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes(); + Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes(); - if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites)) - continue; + Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes(); - if (tx.remainingTime() == -1) - return; + if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites)) + continue; - MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping); + if (tx.remainingTime() == -1) + return; - add(fut); // Append new future. + MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping); + + add(fut); // Append new future. + + assert req.transactionNodes() != null; + + GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( + futId, + fut.futureId(), + tx.topologyVersion(), + tx, + timeout, + dhtWrites, + nearWrites, + this.req.transactionNodes(), + tx.nearXidVersion(), + true, + tx.onePhaseCommit(), + tx.subjectId(), + tx.taskNameHash(), + tx.activeCachesDeploymentEnabled(), + tx.storeWriteThrough(), + retVal); + + int idx = 0; + + for (IgniteTxEntry entry : dhtWrites) { + try { + GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); - assert txNodes != null; + GridCacheContext<?, ?> cacheCtx = cached.context(); - GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - futId, - fut.futureId(), - tx.topologyVersion(), - tx, - timeout, - dhtWrites, - nearWrites, - txNodes, - tx.nearXidVersion(), - true, - tx.onePhaseCommit(), - tx.subjectId(), - tx.taskNameHash(), - tx.activeCachesDeploymentEnabled(), - tx.storeWriteThrough(), - retVal); + // Do not invalidate near entry on originating transaction node. + req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) && + cached.readerId(n.id()) != null); - int idx = 0; + if (cached.isNewLocked()) { + List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(), + tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion()); - for (IgniteTxEntry entry : dhtWrites) { - try { - GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); + // Do not preload if local node is a partition owner. + if (!owners.contains(cctx.localNode())) + req.markKeyForPreload(idx); + } - GridCacheContext<?, ?> cacheCtx = cached.context(); + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; + } - // Do not invalidate near entry on originating transaction node. - req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) && - cached.readerId(n.id()) != null); + idx++; + } - if (cached.isNewLocked()) { - List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(), - tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion()); + if (!F.isEmpty(nearWrites)) { + for (IgniteTxEntry entry : nearWrites) { + try { + if (entry.explicitVersion() == null) { + GridCacheMvccCandidate added = entry.cached().candidate(version()); - // Do not preload if local node is a partition owner. - if (!owners.contains(cctx.localNode())) - req.markKeyForPreload(idx); - } + assert added != null : "Missing candidate for cache entry:" + entry; + assert added.dhtLocal(); - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; + if (added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); } - idx++; + break; } - - if (!F.isEmpty(nearWrites)) { - for (IgniteTxEntry entry : nearWrites) { - try { - if (entry.explicitVersion() == null) { - GridCacheMvccCandidate added = entry.cached().candidate(version()); - - assert added != null : "Missing candidate for cache entry:" + entry; - assert added.dhtLocal(); - - if (added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - } - - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } - } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; } + } + } - assert req.transactionNodes() != null; + assert req.transactionNodes() != null; - try { - cctx.io().send(n, req, tx.ioPolicy()); + try { + cctx.io().send(n, req, tx.ioPolicy()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + ']'); - } - } - catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + if (!cctx.kernalContext().isStopping()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); } - catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + ']'); - } - fut.onResult(e); - } - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + - ", err=" + e + ']'); - } - } + fut.onResult(e); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); } } + } + } - for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { - if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { - if (tx.remainingTime() == -1) - return; - - MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping); - - add(fut); // Append new future. - - GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - futId, - fut.futureId(), - tx.topologyVersion(), - tx, - timeout, - null, - nearMapping.writes(), - tx.transactionNodes(), - tx.nearXidVersion(), - true, - tx.onePhaseCommit(), - tx.subjectId(), - tx.taskNameHash(), - tx.activeCachesDeploymentEnabled(), - tx.storeWriteThrough(), - retVal); - - for (IgniteTxEntry entry : nearMapping.entries()) { - if (CU.writes().apply(entry)) { - try { - if (entry.explicitVersion() == null) { - GridCacheMvccCandidate added = entry.cached().candidate(version()); + for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { + if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { + if (tx.remainingTime() == -1) + return; - assert added != null : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; + MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping); + + add(fut); // Append new future. + + GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( + futId, + fut.futureId(), + tx.topologyVersion(), + tx, + timeout, + null, + nearMapping.writes(), + tx.transactionNodes(), + tx.nearXidVersion(), + true, + tx.onePhaseCommit(), + tx.subjectId(), + tx.taskNameHash(), + tx.activeCachesDeploymentEnabled(), + tx.storeWriteThrough(), + retVal); + + for (IgniteTxEntry entry : nearMapping.entries()) { + if (CU.writes().apply(entry)) { + try { + if (entry.explicitVersion() == null) { + GridCacheMvccCandidate added = entry.cached().candidate(version()); - if (added != null && added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - } + assert added != null : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; - break; - } catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } + if (added != null && added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); } + + break; + } catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; } + } + } - assert req.transactionNodes() != null; + assert req.transactionNodes() != null; - try { - cctx.io().send(nearMapping.primary(), req, tx.ioPolicy()); + try { + cctx.io().send(nearMapping.primary(), req, tx.ioPolicy()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + ']'); - } - } - catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + if (!cctx.kernalContext().isStopping()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + ']'); } - catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + ']'); - } - fut.onResult(e); - } - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + - ", err=" + e + ']'); - } - } + fut.onResult(e); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + + ", err=" + e + ']'); } } } } } - finally { - markInitialized(); - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 1467bfa..7d0f747 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1965,7 +1965,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (dhtFut != null) { if (req.writeSynchronizationMode() == PRIMARY_SYNC // To avoid deadlock disable back-pressure for sender data node. - && !ctx.discovery().cacheAffinityNode(node, ctx.name()) + && !ctx.discovery().cacheGroupAffinityNode(node, ctx.groupId()) && !dhtFut.isDone()) { final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker(); http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index c72f53e..053bbe5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -712,7 +712,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer); + Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, topVer); keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); @@ -815,7 +815,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte AffinityTopologyVersion topVer = lock.topologyVersion(); if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer); + Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, topVer); keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 240b5f0..9589f09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2247,7 +2247,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); GridDhtPartitionTopology top = grp != null ? grp.topology() : - cctx.exchange().clientTopology(desc.groupId()); + cctx.exchange().clientTopology(desc.groupId(), events().discoveryCache()); top.beforeExchange(this, true, true); } @@ -2265,7 +2265,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte CacheGroupContext grp = cctx.cache().cacheGroup(grpId); GridDhtPartitionTopology top = grp != null ? grp.topology() : - cctx.exchange().clientTopology(grpId); + cctx.exchange().clientTopology(grpId, events().discoveryCache()); CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions()); @@ -2448,7 +2448,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (grpCtx != null) top = grpCtx.topology(); else - top = cctx.exchange().clientTopology(e.getKey()); + top = cctx.exchange().clientTopology(e.getKey(), events().discoveryCache()); assignPartitionStates(top); } @@ -2826,10 +2826,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte null); } else { - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); + ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) { - GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId); + GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId, events().discoveryCache()); CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId, top.partitions()); @@ -2858,7 +2858,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte CacheGroupContext grp = cctx.cache().cacheGroup(grpId); GridDhtPartitionTopology top = grp != null ? grp.topology() : - cctx.exchange().clientTopology(grpId); + cctx.exchange().clientTopology(grpId, events().discoveryCache()); top.update(exchId, entry.getValue(), false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index a691cbc..973a199 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -540,7 +540,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> ver = cand.version(); if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion()); + Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, cand.topologyVersion()); if (F.isEmpty(affNodes)) return; @@ -663,7 +663,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (cand != null) { if (map == null) { - Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion()); + Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, cand.topologyVersion()); if (F.isEmpty(affNodes)) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 55d6bdd..8ecf21f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -105,14 +105,12 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; -import static org.apache.ignite.transactions.TransactionState.ACTIVE; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; -import static org.apache.ignite.transactions.TransactionState.SUSPENDED; import static org.apache.ignite.transactions.TransactionState.UNKNOWN; /** @@ -3338,19 +3336,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea /** * Prepares next batch of entries in dht transaction. * - * @param reads Read entries. - * @param writes Write entries. - * @param txNodes Transaction nodes mapping. - * @param last {@code True} if this is last prepare request. + * @param req Prepare request. * @return Future that will be completed when locks are acquired. */ @SuppressWarnings("TypeMayBeWeakened") - public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal( - @Nullable Collection<IgniteTxEntry> reads, - @Nullable Collection<IgniteTxEntry> writes, - Map<UUID, Collection<UUID>> txNodes, - boolean last - ) { + public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(GridNearTxPrepareRequest req) { long timeout = remainingTime(); if (state() != PREPARING) { @@ -3375,11 +3365,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea timeout, 0, Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), - last, + req.last(), needReturnValue() && implicit()); try { - userPrepare((serializable() && optimistic()) ? F.concat(false, writes, reads) : writes); + userPrepare((serializable() && optimistic()) ? F.concat(false, req.writes(), req.reads()) : req.writes()); // Make sure to add future before calling prepare on it. cctx.mvcc().addFuture(fut); @@ -3387,7 +3377,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea if (isSystemInvalidate()) fut.complete(); else - fut.prepare(reads, writes, txNodes); + fut.prepare(req); } catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) { fut.onError(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index c4eae8c..b5fdd23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -600,7 +600,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - Collection<ClusterNode> affNodes = CU.affinityNodes(cctx); + Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer); if (prj == null && part == null) return affNodes; http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 3e27720..f873461 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1859,11 +1859,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Callable<Collection<CacheSqlMetadata>> job = new MetadataJob(); // Remote nodes that have current cache. - Collection<ClusterNode> nodes = F.view(cctx.discovery().remoteNodes(), new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode n) { - return cctx.kernalContext().discovery().cacheAffinityNode(n, cacheName); - } - }); + Collection<ClusterNode> nodes = CU.affinityNodes(cctx, AffinityTopologyVersion.NONE); Collection<Collection<CacheSqlMetadata>> res = new ArrayList<>(nodes.size() + 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index beeb184..362eaac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -254,11 +254,7 @@ public class IgniteTxHandler { ) { req.txState(locTx.txState()); - IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal( - req.reads(), - req.writes(), - req.transactionNodes(), - req.last()); + IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(req); if (locTx.isRollbackOnly()) locTx.rollbackNearTxLocalAsync(); @@ -520,14 +516,7 @@ public class IgniteTxHandler { if (req.needReturnValue()) tx.needReturnValue(true); - IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync( - req.reads(), - req.writes(), - req.dhtVersions(), - req.messageId(), - req.miniId(), - req.transactionNodes(), - req.last()); + IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(req); if (tx.isRollbackOnly() && !tx.commitOnPrepare()) { if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK) http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 2b3a882..f750053 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -1489,7 +1489,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite if (!cache.context().affinityNode()) { ClusterNode oldestSrvNode = - ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); + ctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE); if (oldestSrvNode == null) return new GridEmptyIterator<>(); @@ -1730,7 +1730,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite // In case the cache instance isn't tracked by DiscoveryManager anymore. discoCache.updateAlives(ctx.discovery()); - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + ClusterNode oldest = discoCache.oldestAliveServerNode(); if (oldest != null && oldest.isLocal()) { final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index 695d8a6..95e9479 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -2826,7 +2826,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { assertNotNull("Failed to find exchange future:", evt); - Collection<ClusterNode> allNodes = ctx.discovery().cacheNodes(topVer0); + Collection<ClusterNode> allNodes = ctx.discovery().serverNodes(topVer0); for (DynamicCacheDescriptor cacheDesc : ctx.cache().cacheDescriptors().values()) { if (assignments.get(cacheDesc.cacheId()) != null)
