Repository: ignite Updated Branches: refs/heads/ignite-5578 3dc2a17d6 -> 39ec42c7e
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39ec42c7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39ec42c7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39ec42c7 Branch: refs/heads/ignite-5578 Commit: 39ec42c7e42ff504c2dccc291bec1a7fb6d54974 Parents: 3dc2a17 Author: sboikov <[email protected]> Authored: Wed Aug 2 18:22:05 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Aug 2 18:48:47 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 12 +++-- .../GridDhtPartitionsExchangeFuture.java | 48 +++++++++++--------- .../distributed/CacheExchangeMergeTest.java | 6 +-- 3 files changed, 38 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/39ec42c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index b28cd95..a785477 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1239,13 +1239,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return grpHolder.affinity(); } - public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut, final GridDhtPartitionsFullMessage msg) { + /** + * @param fut Current exchange future. + * @param msg Finish exchange message. + */ + public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut, + final GridDhtPartitionsFullMessage msg) { final Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); - log.info("mergeExchangesOnServerLeft [topVer=" + fut.context().events().discoveryCache().version() + ']'); - forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { ExchangeDiscoveryEvents evts = fut.context().events(); @@ -2045,7 +2048,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param topVer Topology version. * @param fut Exchange future. + * @param c Closure converting affinity diff. + * @param initAff {@code True} if need initialize affinity. * @return Affinity assignment. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/39ec42c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1a5a8e2..3579f3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -130,6 +130,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private final Set<UUID> remaining = new HashSet<>(); + /** */ + @GridToStringExclude + private final Object mux = new Object(); + /** Guarded by this */ @GridToStringExclude private int pendingSingleUpdates; @@ -805,7 +809,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte changeGlobalStateE = e; if (crd) { - synchronized (this) { + synchronized (mux) { changeGlobalStateExceptions.put(cctx.localNodeId(), e); } } @@ -1567,7 +1571,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte public boolean mergeJoinExchange(GridDhtPartitionsExchangeFuture fut) { boolean wait; - synchronized (this) { + synchronized (mux) { assert (!isDone() && !initFut.isDone()) || cctx.kernalContext().isStopping() : this; assert (mergedWith == null && state == null) || cctx.kernalContext().isStopping() : this; @@ -1588,7 +1592,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @return Pending join request if any. */ @Nullable public GridDhtPartitionsSingleMessage mergeJoinExchangeOnDone(GridDhtPartitionsExchangeFuture fut) { - synchronized (this) { + synchronized (mux) { assert !isDone(); assert !initFut.isDone(); assert mergedWith == null; @@ -1617,7 +1621,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte FinishState finishState0 = null; - synchronized (this) { + synchronized (mux) { if (state == ExchangeLocalState.DONE) { assert finishState != null; @@ -1672,7 +1676,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (msg.restoreState()) { InitNewCoordinatorFuture newCrdFut0; - synchronized (this) { + synchronized (mux) { assert newCrdFut != null; newCrdFut0 = newCrdFut; @@ -1691,7 +1695,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte GridDhtPartitionsExchangeFuture mergedWith0 = null; - synchronized (this) { + synchronized (mux) { if (state == ExchangeLocalState.MERGED) { assert mergedWith != null; @@ -1733,7 +1737,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { FinishState finishState0; - synchronized (GridDhtPartitionsExchangeFuture.this) { + synchronized (mux) { finishState0 = finishState; } @@ -1758,7 +1762,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte FinishState finishState0; - synchronized (GridDhtPartitionsExchangeFuture.this) { + synchronized (mux) { finishState0 = finishState; } @@ -1794,7 +1798,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte FinishState finishState0 = null; - synchronized (this) { + synchronized (mux) { assert crd != null; switch (state) { @@ -1858,7 +1862,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte updatePartitionSingleMap(nodeId, msg); } finally { - synchronized (this) { + synchronized (mux) { assert pendingSingleUpdates > 0; pendingSingleUpdates--; @@ -2154,7 +2158,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null; if (exchCtx.mergeExchanges()) { - synchronized (this) { + synchronized (mux) { if (mergedJoinExchMsgs != null) { for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) { msgs.put(e.getKey(), e.getValue()); @@ -2256,7 +2260,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte msg.prepareMarshal(cctx); - synchronized (this) { + synchronized (mux) { finishState = new FinishState(crd.id(), resTopVer, msg); state = ExchangeLocalState.DONE; @@ -2282,7 +2286,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs0; - synchronized (this) { + synchronized (mux) { srvNodes.remove(cctx.localNode()); nodes = U.newHashSet(srvNodes.size()); @@ -2461,7 +2465,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) { FinishState finishState0 = null; - synchronized (this) { + synchronized (mux) { if (crd == null) { log.info("Ignore partitions request, no coordinator [node=" + node.id() + ']'); @@ -2577,7 +2581,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (checkCrd) { assert node != null; - synchronized (this) { + synchronized (mux) { if (crd == null) { log.info("Ignore full message, all server nodes left: " + msg); @@ -2881,14 +2885,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte InitNewCoordinatorFuture newCrdFut0; - synchronized (this) { + synchronized (mux) { newCrdFut0 = newCrdFut; } if (newCrdFut0 != null) newCrdFut0.onNodeLeft(node.id()); - synchronized (this) { + synchronized (mux) { if (!srvNodes.remove(node)) return; @@ -3046,7 +3050,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log.info("New coordinator restored state [ver=" + initialVersion() + ", resVer=" + fullMsg.resultTopologyVersion() + ']'); - synchronized (this) { + synchronized (mux) { state = ExchangeLocalState.DONE; finishState = new FinishState(crd.id(), fullMsg.resultTopologyVersion(), fullMsg); @@ -3097,7 +3101,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte allRcvd = true; - synchronized (this) { + synchronized (mux) { remaining.clear(); // Do not process messages. assert crd != null && crd.isLocal(); @@ -3110,7 +3114,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else { Set<UUID> remaining0 = null; - synchronized (this) { + synchronized (mux) { assert crd != null && crd.isLocal(); state = ExchangeLocalState.CRD; @@ -3215,7 +3219,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ClusterNode crd; Set<UUID> remaining; - synchronized (this) { + synchronized (mux) { crd = this.crd; remaining = new HashSet<>(this.remaining); } @@ -3249,7 +3253,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @Override public String toString() { Set<UUID> remaining; - synchronized (this) { + synchronized (mux) { remaining = new HashSet<>(this.remaining); } http://git-wip-us.apache.org/repos/asf/ignite/blob/39ec42c7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index ac9390e..b32164f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -1148,7 +1148,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { ", cache=" + cacheName + ']'; for (int i = 0; i < 5; i++) { - Integer key = rnd.nextInt(100_000); + Integer key = rnd.nextInt(20_000); cache.put(key, i); @@ -1161,7 +1161,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { Map<Integer, Integer> map = new TreeMap<>(); for (int j = 0; j < 10; j++) { - Integer key = rnd.nextInt(100_000); + Integer key = rnd.nextInt(20_000); map.put(key, i); } @@ -1204,7 +1204,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { try (Transaction tx = node.transactions().txStart(concurrency, isolation)) { for (int i = 0; i < 5; i++) { - Integer key = rnd.nextInt(100_000); + Integer key = rnd.nextInt(20_000); cache.put(key, i);
