Repository: ignite Updated Branches: refs/heads/master f80d3a95d -> 5dc4de83f
IGNITE-8019 Reduced code section size under CP read lock - Fixes #3685. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5dc4de83 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5dc4de83 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5dc4de83 Branch: refs/heads/master Commit: 5dc4de83f76f3d978f72fa0f9e0815a8cc4077e3 Parents: f80d3a9 Author: Ilya Lantukh <[email protected]> Authored: Mon Apr 2 11:11:46 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Apr 2 11:11:46 2018 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 159 +++++++++---------- 1 file changed, 78 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5dc4de83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 734bbaa..337553b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -679,73 +679,80 @@ public class GridDhtPartitionDemander { GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext(); - // Preload. - for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { - int p = e.getKey(); + ctx.database().checkpointReadLock(); - if (aff.get(p).contains(ctx.localNode())) { - GridDhtLocalPartition part = top.localPartition(p, topVer, true); + try { + // Preload. + for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { + int p = e.getKey(); - assert part != null; + if (aff.get(p).contains(ctx.localNode())) { + GridDhtLocalPartition part = top.localPartition(p, topVer, true); - boolean last = supply.last().containsKey(p); + assert part != null; - if (part.state() == MOVING) { - boolean reserved = part.reserve(); + boolean last = supply.last().containsKey(p); - assert reserved : "Failed to reserve partition [igniteInstanceName=" + - ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']'; + if (part.state() == MOVING) { + boolean reserved = part.reserve(); - part.lock(); + assert reserved : "Failed to reserve partition [igniteInstanceName=" + + ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']'; - try { - // Loop through all received entries and try to preload them. - for (GridCacheEntryInfo entry : e.getValue().infos()) { - if (!preloadEntry(node, p, entry, topVer)) { - if (log.isDebugEnabled()) - log.debug("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); + part.lock(); - break; - } + try { + // Loop through all received entries and try to preload them. + for (GridCacheEntryInfo entry : e.getValue().infos()) { + if (!preloadEntry(node, p, entry, topVer)) { + if (log.isDebugEnabled()) + log.debug("Got entries for invalid partition during " + + "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != entry.cacheId())) - cctx = ctx.cacheContext(entry.cacheId()); + break; + } - if (cctx != null && cctx.statisticsEnabled()) - cctx.cache().metrics0().onRebalanceKeyReceived(); - } + if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != entry.cacheId())) + cctx = ctx.cacheContext(entry.cacheId()); - // If message was last for this partition, - // then we take ownership. - if (last) { - top.own(part); + if (cctx != null && cctx.statisticsEnabled()) + cctx.cache().metrics0().onRebalanceKeyReceived(); + } - fut.partitionDone(nodeId, p); + // If message was last for this partition, + // then we take ownership. + if (last) { + top.own(part); - if (log.isDebugEnabled()) - log.debug("Finished rebalancing partition: " + part); + fut.partitionDone(nodeId, p); + + if (log.isDebugEnabled()) + log.debug("Finished rebalancing partition: " + part); + } + } + finally { + part.unlock(); + part.release(); } } - finally { - part.unlock(); - part.release(); + else { + if (last) + fut.partitionDone(nodeId, p); + + if (log.isDebugEnabled()) + log.debug("Skipping rebalancing partition (state is not MOVING): " + part); } } else { - if (last) - fut.partitionDone(nodeId, p); + fut.partitionDone(nodeId, p); if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (state is not MOVING): " + part); + log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); } } - else { - fut.partitionDone(nodeId, p); - - if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); - } + } + finally { + ctx.database().checkpointReadUnlock(); } // Only request partitions based on latest topology version. @@ -803,7 +810,7 @@ public class GridDhtPartitionDemander { GridCacheEntryInfo entry, AffinityTopologyVersion topVer ) throws IgniteCheckedException { - ctx.database().checkpointReadLock(); + assert ctx.database().checkpointLockIsHeldByThread(); try { GridCacheEntryEx cached = null; @@ -816,41 +823,34 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + from.id() + ']'); - cctx.shared().database().checkpointReadLock(); - - try { - if (preloadPred == null || preloadPred.apply(entry)) { - if (cached.initialValue( - entry.value(), - entry.version(), - entry.ttl(), - entry.expireTime(), - true, - topVer, - cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE, - false - )) { - cctx.evicts().touch(cached, topVer); // Start tracking. - - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) - cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), - (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, - false, null, null, null, true); - } - else { - cctx.evicts().touch(cached, topVer); // Start tracking. + if (preloadPred == null || preloadPred.apply(entry)) { + if (cached.initialValue( + entry.value(), + entry.version(), + entry.ttl(), + entry.expireTime(), + true, + topVer, + cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE, + false + )) { + cctx.evicts().touch(cached, topVer); // Start tracking. + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) + cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), + (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, + false, null, null, null, true); + } + else { + cctx.evicts().touch(cached, topVer); // Start tracking. - if (log.isDebugEnabled()) - log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + - ", part=" + p + ']'); - } + if (log.isDebugEnabled()) + log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + + ", part=" + p + ']'); } - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry); - } - finally { - cctx.shared().database().checkpointReadUnlock(); } + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry); } catch (GridCacheEntryRemovedException ignored) { if (log.isDebugEnabled()) @@ -871,9 +871,6 @@ public class GridDhtPartitionDemander { throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + ctx.localNode() + ", node=" + from.id() + ", key=" + entry.key() + ", part=" + p + ']', e); } - finally { - ctx.database().checkpointReadUnlock(); - } return true; }
