Repository: ignite Updated Branches: refs/heads/master e0e02abaa -> 2bdc89827
IGNITE-9710 Exchange worker liveness checking improvements Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2bdc8982 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2bdc8982 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2bdc8982 Branch: refs/heads/master Commit: 2bdc8982796626f41f39f7ae28af12966b04a24f Parents: e0e02ab Author: Andrey Kuznetsov <[email protected]> Authored: Wed Oct 17 17:25:54 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Wed Oct 17 17:25:54 2018 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 35 +- .../processors/cache/GridCacheMvccManager.java | 2 + .../GridCachePartitionExchangeManager.java | 7 + .../processors/cache/GridCacheProcessor.java | 6 + .../GridDhtPartitionsExchangeFuture.java | 366 +++++++++++++++---- 5 files changed, 341 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 9cbceb1..cedbde1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -41,7 +41,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; @@ -729,6 +728,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return; aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion()); + + cctx.exchange().exchangerUpdateHeartbeat(); } }); } @@ -1154,6 +1155,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } else aff.clientEventTopologyChange(exchFut.firstEvent(), topVer); + + cctx.exchange().exchangerUpdateHeartbeat(); } }); } @@ -1174,6 +1177,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap AffinityTopologyVersion topVer = fut.initialVersion(); aff.clientEventTopologyChange(fut.firstEvent(), topVer); + + cctx.exchange().exchangerUpdateHeartbeat(); } }); } @@ -1318,16 +1323,22 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc); - if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) + if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { calculateAndInit(fut.events(), cache.affinity(), fut.initialVersion()); + + cctx.exchange().exchangerUpdateHeartbeat(); + } } }); } else { forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { - if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) + if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) { initAffinity(cachesRegistry.group(aff.groupId()), aff, fut); + + cctx.exchange().exchangerUpdateHeartbeat(); + } } }); } @@ -1662,6 +1673,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupHolder grpHolder = groupHolder(topVer, desc); calculateAndInit(fut.events(), grpHolder.affinity(), topVer); + + cctx.exchange().exchangerUpdateHeartbeat(); } }); } @@ -1791,6 +1804,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap calculateAndInit(fut.events(), grp.affinity(), topVer); } } + + cctx.exchange().exchangerUpdateHeartbeat(); } for (int i = 0; i < fetchFuts.size(); i++) { @@ -1804,6 +1819,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap fut.events().discoveryCache(), cctx.cache().cacheGroup(grpId).affinity(), fetchFut); + + cctx.exchange().exchangerUpdateHeartbeat(); } } @@ -1874,6 +1891,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc); cache.aff.calculate(fut.initialVersion(), fut.events(), fut.events().discoveryCache()); + + cctx.exchange().exchangerUpdateHeartbeat(); } }); } @@ -1881,6 +1900,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { aff.calculate(fut.initialVersion(), fut.events(), fut.events().discoveryCache()); + + cctx.exchange().exchangerUpdateHeartbeat(); } }); } @@ -1977,6 +1998,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap aff.calculate(topVer, fut.events(), fut.events().discoveryCache()); affFut.onDone(topVer); + + cctx.exchange().exchangerUpdateHeartbeat(); } }); @@ -1999,6 +2022,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupHolder old = grpHolders.put(grpHolder.groupId(), grpHolder); assert old == null : old; + + cctx.exchange().exchangerUpdateHeartbeat(); } }); @@ -2075,6 +2100,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap null, grp.rebalanceEnabled(), affCache); + + cctx.exchange().exchangerUpdateHeartbeat(); } }); @@ -2111,6 +2138,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (GridDhtPartitionMap map0 : map.values()) cache.topology(fut.context().events().discoveryCache()).update(fut.exchangeId(), map0, true); } + + cctx.exchange().exchangerUpdateHeartbeat(); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 690b15a..16324de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -367,6 +367,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { log.debug("Attempted to remove node locks from removed entry in mvcc manager " + "disco callback (will ignore): " + entry); } + + cctx.exchange().exchangerUpdateHeartbeat(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index dbfc3e4..0baf5a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -2258,6 +2258,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * Invokes {@link GridWorker#updateHeartbeat()} for exchange worker. + */ + public void exchangerUpdateHeartbeat() { + exchWorker.updateHeartbeat(); + } + + /** * Invokes {@link GridWorker#blockingSectionBegin()} for exchange worker. * Should be called from exchange worker thread. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 68698ec..ec88a93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2026,6 +2026,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor t.get2(), exchTopVer, false); + + context().exchange().exchangerUpdateHeartbeat(); } if (log.isInfoEnabled()) @@ -2064,6 +2066,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor null, exchTopVer, false); + + context().exchange().exchangerUpdateHeartbeat(); } } @@ -5030,6 +5034,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor // Make sure to remove future before completion. pendingFuts.remove(id, this); + context().exchange().exchangerUpdateHeartbeat(); + return super.onDone(res, err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2bdc8982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 32cd0d4..e550a8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -86,13 +86,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext; import org.apache.ignite.internal.processors.cache.StateChangeRequest; import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -633,10 +633,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (newCrd) { IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this, false); - if (fut != null) + if (fut != null) { fut.get(); + cctx.exchange().exchangerUpdateHeartbeat(); + } + cctx.exchange().onCoordinatorInitialized(); + + cctx.exchange().exchangerUpdateHeartbeat(); } } @@ -654,7 +659,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte initTs = U.currentTimeMillis(); - U.await(evtLatch); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + U.await(evtLatch); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } assert firstDiscoEvt != null : this; assert exchId.nodeId().equals(firstDiscoEvt.eventNode().id()) : this; @@ -680,7 +692,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchCtx = new ExchangeContext(crdNode, mvccCrdChange, this); - cctx.kernalContext().coordinators().onExchangeStart(mvccCrd, exchCtx, crd); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + cctx.kernalContext().coordinators().onExchangeStart(mvccCrd, exchCtx, crd); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } assert state == null : state; @@ -812,8 +831,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert false; } - if (cctx.localNode().isClient()) - tryToPerformLocalSnapshotOperation(); + if (cctx.localNode().isClient()) { + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + tryToPerformLocalSnapshotOperation(); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } + } if (exchLog.isInfoEnabled()) exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']'); @@ -845,15 +872,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private IgniteInternalFuture<?> initCachesOnLocalJoin() throws IgniteCheckedException { if (isLocalNodeNotInBaseline()) { - cctx.cache().cleanupCachesDirectories(); + cctx.exchange().exchangerBlockingSectionBegin(); - cctx.database().cleanupCheckpointDirectory(); + try { + cctx.cache().cleanupCachesDirectories(); + + cctx.database().cleanupCheckpointDirectory(); - if (cctx.wal() != null) - cctx.wal().cleanupWalDirectories(); + if (cctx.wal() != null) + cctx.wal().cleanupWalDirectories(); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } - cctx.activate(); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + cctx.activate(); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } LocalJoinCachesContext locJoinCtx = exchActions == null ? null : exchActions.localJoinContext(); @@ -872,7 +913,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - cctx.database().readCheckpointAndRestoreMemory(startDescs); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + cctx.database().readCheckpointAndRestoreMemory(startDescs); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } IgniteInternalFuture<?> cachesRegistrationFut = cctx.cache().startCachesOnLocalJoin(initialVersion(), locJoinCtx); @@ -923,6 +971,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte continue; grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false); + + cctx.exchange().exchangerUpdateHeartbeat(); } } } @@ -953,29 +1003,50 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop && clientTop != null) { - top.update(null, - clientTop.partitionMap(true), - clientTop.fullUpdateCounters(), - Collections.emptySet(), - null, - null); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + top.update(null, + clientTop.partitionMap(true), + clientTop.fullUpdateCounters(), + Collections.emptySet(), + null, + null); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } } - top.updateTopologyVersion( - this, - events().discoveryCache(), - mvccCrd, - updSeq, - cacheGroupStopping(grp.groupId())); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + top.updateTopologyVersion( + this, + events().discoveryCache(), + mvccCrd, + updSeq, + cacheGroupStopping(grp.groupId())); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - top.updateTopologyVersion(this, - events().discoveryCache(), - mvccCrd, - -1, - cacheGroupStopping(top.groupId())); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { + top.updateTopologyVersion(this, + events().discoveryCache(), + mvccCrd, + -1, + cacheGroupStopping(top.groupId())); + } + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); } } @@ -1004,7 +1075,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } try { - cctx.activate(); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + cctx.activate(); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } if (!cctx.kernalContext().clientNode()) { List<DynamicCacheDescriptor> startDescs = new ArrayList<>(); @@ -1017,12 +1095,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte startDescs.add(desc); } - cctx.database().readCheckpointAndRestoreMemory(startDescs); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + cctx.database().readCheckpointAndRestoreMemory(startDescs); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started."; - registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } if (log.isInfoEnabled()) { log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() + @@ -1038,8 +1130,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchangeLocE = e; if (crd) { - synchronized (mux) { - exchangeGlobalExceptions.put(cctx.localNodeId(), e); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + synchronized (mux) { + exchangeGlobalExceptions.put(cctx.localNodeId(), e); + } + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); } } } @@ -1051,6 +1150,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ", topVer=" + initialVersion() + "]"); } + cctx.exchange().exchangerBlockingSectionBegin(); + try { cctx.kernalContext().dataStructures().onDeActivate(cctx.kernalContext()); @@ -1076,9 +1177,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchangeLocE = e; } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } } else if (req.activate()) { + cctx.exchange().exchangerBlockingSectionBegin(); + // TODO: BLT changes on inactive cluster can't be handled easily because persistent storage hasn't been initialized yet. try { if (!forceAffReassignment) { @@ -1101,6 +1207,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchangeLocE = e; } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; @@ -1116,6 +1225,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert !exchActions.clientOnlyExchange() : exchActions; + cctx.exchange().exchangerBlockingSectionBegin(); + try { assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started."; @@ -1133,6 +1244,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchangeGlobalExceptions.put(cctx.localNodeId(), exchangeLocE); } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; } @@ -1213,10 +1327,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (crd != null) { assert !crd.isLocal() : crd; - if (!centralizedAff) - sendLocalPartitions(crd); + cctx.exchange().exchangerBlockingSectionBegin(); - initDone(); + try { + if (!centralizedAff) + sendLocalPartitions(crd); + + initDone(); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } return; } @@ -1226,13 +1347,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte GridAffinityAssignmentCache aff = grp.affinity(); aff.initialize(initialVersion(), aff.idealAssignment()); + + cctx.exchange().exchangerUpdateHeartbeat(); } } else onAllServersLeft(); } - onDone(initialVersion()); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + onDone(initialVersion()); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } /** @@ -1247,13 +1377,27 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (grp.isLocal()) continue; - grp.preloader().onTopologyChanged(this); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + grp.preloader().onTopologyChanged(this); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } - cctx.database().releaseHistoryForPreloading(); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + cctx.database().releaseHistoryForPreloading(); - // To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange. - partHistReserved = cctx.database().reserveHistoryForExchange(); + // To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange. + partHistReserved = cctx.database().reserveHistoryForExchange(); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } // Skipping wait on local join is available when all cluster nodes have the same protocol. boolean skipWaitOnLocalJoin = cctx.exchange().latch().canSkipJoiningNodes(initialVersion()) @@ -1288,7 +1432,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (topChanged) { // Partition release future is done so we can flush the write-behind store. - cacheCtx.store().forceFlush(); + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + cacheCtx.store().forceFlush(); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } } @@ -1296,7 +1447,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte In case of persistent store is enabled we first restore partitions presented on disk. We need to guarantee that there are no partition state changes logged to WAL before this callback to make sure that we correctly restored last actual states. */ - boolean restored = cctx.database().beforeExchange(this); + boolean restored; + + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + restored = cctx.database().beforeExchange(this); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } // Pre-create missing partitions using current affinity. if (!exchCtx.mergeExchanges()) { @@ -1305,25 +1465,48 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte continue; // It is possible affinity is not initialized yet if node joins to cluster. - if (grp.affinity().lastVersion().topologyVersion() > 0) - grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false); + if (grp.affinity().lastVersion().topologyVersion() > 0) { + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } + } } } // After all partitions have been restored and pre-created it's safe to make first checkpoint. - if (restored) - cctx.database().onStateRestored(); + if (restored) { + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + cctx.database().onStateRestored(); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } + } changeWalModeIfNeeded(); - if (crd.isLocal()) { - if (remaining.isEmpty()) - onAllReceived(null); - } - else - sendPartitions(crd); + cctx.exchange().exchangerBlockingSectionBegin(); - initDone(); + try { + if (crd.isLocal()) { + if (remaining.isEmpty()) + onAllReceived(null); + } + else + sendPartitions(crd); + + initDone(); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } /** @@ -1356,8 +1539,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void changeWalModeIfNeeded() { WalStateAbstractMessage msg = firstWalMessage(); - if (msg != null) - cctx.walState().onProposeExchange(msg.exchangeMessage()); + if (msg != null) { + cctx.exchange().exchangerBlockingSectionBegin(); + + try { + cctx.walState().onProposeExchange(msg.exchangeMessage()); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } + } } /** @@ -1397,17 +1588,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void waitPartitionRelease(boolean distributed, boolean doRollback) throws IgniteCheckedException { Latch releaseLatch = null; - // Wait for other nodes only on first phase. - if (distributed) - releaseLatch = cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion()); + IgniteInternalFuture<?> partReleaseFut; - IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion()); + cctx.exchange().exchangerBlockingSectionBegin(); - // Assign to class variable so it will be included into toString() method. - this.partReleaseFut = partReleaseFut; + try { + // Wait for other nodes only on first phase. + if (distributed) + releaseLatch = cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion()); + + partReleaseFut = cctx.partitionReleaseFuture(initialVersion()); - if (exchId.isLeft()) - cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); + // Assign to class variable so it will be included into toString() method. + this.partReleaseFut = partReleaseFut; + + if (exchId.isLeft()) + cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); + } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } if (log.isTraceEnabled()) log.trace("Before waiting for partition release future: " + this); @@ -1428,6 +1628,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte // Read txTimeoutOnPME from configuration after every iteration. long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange(); + cctx.exchange().exchangerBlockingSectionBegin(); + try { // This avoids unnessesary waiting for rollback. partReleaseFut.get(curTimeout > 0 && !txRolledBack ? @@ -1454,6 +1656,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte throw e; } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } long waitEnd = U.currentTimeMillis(); @@ -1477,6 +1682,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte dumpCnt = 0; while (true) { + cctx.exchange().exchangerBlockingSectionBegin(); + try { locksFut.get(waitTimeout, TimeUnit.MILLISECONDS); @@ -1507,6 +1714,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte U.dumpThreads(log); } } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } if (releaseLatch == null) { @@ -1560,6 +1770,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte continue; grp.preloader().unwindUndeploys(); + + cctx.exchange().exchangerUpdateHeartbeat(); } cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); @@ -1972,12 +2184,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (super.onDone(res, err)) { afterLsnrCompleteFut.onDone(); - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + - ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); - else if(log.isInfoEnabled()) - log.info("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange=" + shortInfo() + - ", topVer=" + topologyVersion() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); + } + else if (log.isInfoEnabled()) { + log.info("Completed partition exchange [localNode=" + cctx.localNodeId() + + ", exchange=" + shortInfo() + ", topVer=" + topologyVersion() + + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); + } initFut.onDone(err == null); @@ -2041,6 +2256,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte (int)(cctx.kernalContext().config().getFailureDetectionTimeout() / 2)); for (;;) { + cctx.exchange().exchangerBlockingSectionBegin(); + try { registerCachesFut.get(timeout, TimeUnit.SECONDS); @@ -2055,6 +2272,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte "Probably disk is too busy or slow." + "[caches=" + cacheNames + "]"); } + finally { + cctx.exchange().exchangerBlockingSectionEnd(); + } } } } @@ -4010,6 +4230,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte grp.affinity().idealAssignment(affAssignment); grp.affinity().initialize(initialVersion(), affAssignment); + + cctx.exchange().exchangerUpdateHeartbeat(); } }
