Repository: ignite
Updated Branches:
  refs/heads/master f80d3a95d -> 5dc4de83f


IGNITE-8019 Reduced code section size under CP read lock - Fixes #3685.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5dc4de83
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5dc4de83
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5dc4de83

Branch: refs/heads/master
Commit: 5dc4de83f76f3d978f72fa0f9e0815a8cc4077e3
Parents: f80d3a9
Author: Ilya Lantukh <[email protected]>
Authored: Mon Apr 2 11:11:46 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Mon Apr 2 11:11:46 2018 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 159 +++++++++----------
 1 file changed, 78 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5dc4de83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 734bbaa..337553b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -679,73 +679,80 @@ public class GridDhtPartitionDemander {
 
             GridCacheContext cctx = grp.sharedGroup() ? null : 
grp.singleCacheContext();
 
-            // Preload.
-            for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supply.infos().entrySet()) {
-                int p = e.getKey();
+            ctx.database().checkpointReadLock();
 
-                if (aff.get(p).contains(ctx.localNode())) {
-                    GridDhtLocalPartition part = top.localPartition(p, topVer, 
true);
+            try {
+                // Preload.
+                for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supply.infos().entrySet()) {
+                    int p = e.getKey();
 
-                    assert part != null;
+                    if (aff.get(p).contains(ctx.localNode())) {
+                        GridDhtLocalPartition part = top.localPartition(p, 
topVer, true);
 
-                    boolean last = supply.last().containsKey(p);
+                        assert part != null;
 
-                    if (part.state() == MOVING) {
-                        boolean reserved = part.reserve();
+                        boolean last = supply.last().containsKey(p);
 
-                        assert reserved : "Failed to reserve partition 
[igniteInstanceName=" +
-                            ctx.igniteInstanceName() + ", grp=" + 
grp.cacheOrGroupName() + ", part=" + part + ']';
+                        if (part.state() == MOVING) {
+                            boolean reserved = part.reserve();
 
-                        part.lock();
+                            assert reserved : "Failed to reserve partition 
[igniteInstanceName=" +
+                                ctx.igniteInstanceName() + ", grp=" + 
grp.cacheOrGroupName() + ", part=" + part + ']';
 
-                        try {
-                            // Loop through all received entries and try to 
preload them.
-                            for (GridCacheEntryInfo entry : 
e.getValue().infos()) {
-                                if (!preloadEntry(node, p, entry, topVer)) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got entries for invalid 
partition during " +
-                                            "preloading (will skip) [p=" + p + 
", entry=" + entry + ']');
+                            part.lock();
 
-                                    break;
-                                }
+                            try {
+                                // Loop through all received entries and try 
to preload them.
+                                for (GridCacheEntryInfo entry : 
e.getValue().infos()) {
+                                    if (!preloadEntry(node, p, entry, topVer)) 
{
+                                        if (log.isDebugEnabled())
+                                            log.debug("Got entries for invalid 
partition during " +
+                                                "preloading (will skip) [p=" + 
p + ", entry=" + entry + ']');
 
-                                if (grp.sharedGroup() && (cctx == null || 
cctx.cacheId() != entry.cacheId()))
-                                    cctx = ctx.cacheContext(entry.cacheId());
+                                        break;
+                                    }
 
-                                if (cctx != null && cctx.statisticsEnabled())
-                                    
cctx.cache().metrics0().onRebalanceKeyReceived();
-                            }
+                                    if (grp.sharedGroup() && (cctx == null || 
cctx.cacheId() != entry.cacheId()))
+                                        cctx = 
ctx.cacheContext(entry.cacheId());
 
-                            // If message was last for this partition,
-                            // then we take ownership.
-                            if (last) {
-                                top.own(part);
+                                    if (cctx != null && 
cctx.statisticsEnabled())
+                                        
cctx.cache().metrics0().onRebalanceKeyReceived();
+                                }
 
-                                fut.partitionDone(nodeId, p);
+                                // If message was last for this partition,
+                                // then we take ownership.
+                                if (last) {
+                                    top.own(part);
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Finished rebalancing partition: 
" + part);
+                                    fut.partitionDone(nodeId, p);
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Finished rebalancing 
partition: " + part);
+                                }
+                            }
+                            finally {
+                                part.unlock();
+                                part.release();
                             }
                         }
-                        finally {
-                            part.unlock();
-                            part.release();
+                        else {
+                            if (last)
+                                fut.partitionDone(nodeId, p);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Skipping rebalancing partition 
(state is not MOVING): " + part);
                         }
                     }
                     else {
-                        if (last)
-                            fut.partitionDone(nodeId, p);
+                        fut.partitionDone(nodeId, p);
 
                         if (log.isDebugEnabled())
-                            log.debug("Skipping rebalancing partition (state 
is not MOVING): " + part);
+                            log.debug("Skipping rebalancing partition (it does 
not belong on current node): " + p);
                     }
                 }
-                else {
-                    fut.partitionDone(nodeId, p);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping rebalancing partition (it does not 
belong on current node): " + p);
-                }
+            }
+            finally {
+                ctx.database().checkpointReadUnlock();
             }
 
             // Only request partitions based on latest topology version.
@@ -803,7 +810,7 @@ public class GridDhtPartitionDemander {
         GridCacheEntryInfo entry,
         AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
-        ctx.database().checkpointReadLock();
+        assert ctx.database().checkpointLockIsHeldByThread();
 
         try {
             GridCacheEntryEx cached = null;
@@ -816,41 +823,34 @@ public class GridDhtPartitionDemander {
                 if (log.isDebugEnabled())
                     log.debug("Rebalancing key [key=" + entry.key() + ", 
part=" + p + ", node=" + from.id() + ']');
 
-                cctx.shared().database().checkpointReadLock();
-
-                try {
-                    if (preloadPred == null || preloadPred.apply(entry)) {
-                        if (cached.initialValue(
-                            entry.value(),
-                            entry.version(),
-                            entry.ttl(),
-                            entry.expireTime(),
-                            true,
-                            topVer,
-                            cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
-                            false
-                        )) {
-                            cctx.evicts().touch(cached, topVer); // Start 
tracking.
-
-                            if 
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && 
!cached.isInternal())
-                                cctx.events().addEvent(cached.partition(), 
cached.key(), cctx.localNodeId(),
-                                    (IgniteUuid)null, null, 
EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
-                                    false, null, null, null, true);
-                        }
-                        else {
-                            cctx.evicts().touch(cached, topVer); // Start 
tracking.
+                if (preloadPred == null || preloadPred.apply(entry)) {
+                    if (cached.initialValue(
+                        entry.value(),
+                        entry.version(),
+                        entry.ttl(),
+                        entry.expireTime(),
+                        true,
+                        topVer,
+                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
+                        false
+                    )) {
+                        cctx.evicts().touch(cached, topVer); // Start tracking.
+
+                        if 
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && 
!cached.isInternal())
+                            cctx.events().addEvent(cached.partition(), 
cached.key(), cctx.localNodeId(),
+                                (IgniteUuid)null, null, 
EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+                                false, null, null, null, true);
+                    }
+                    else {
+                        cctx.evicts().touch(cached, topVer); // Start tracking.
 
-                            if (log.isDebugEnabled())
-                                log.debug("Rebalancing entry is already in 
cache (will ignore) [key=" + cached.key() +
-                                    ", part=" + p + ']');
-                        }
+                        if (log.isDebugEnabled())
+                            log.debug("Rebalancing entry is already in cache 
(will ignore) [key=" + cached.key() +
+                                ", part=" + p + ']');
                     }
-                    else if (log.isDebugEnabled())
-                        log.debug("Rebalance predicate evaluated to false for 
entry (will ignore): " + entry);
-                }
-                finally {
-                    cctx.shared().database().checkpointReadUnlock();
                 }
+                else if (log.isDebugEnabled())
+                    log.debug("Rebalance predicate evaluated to false for 
entry (will ignore): " + entry);
             }
             catch (GridCacheEntryRemovedException ignored) {
                 if (log.isDebugEnabled())
@@ -871,9 +871,6 @@ public class GridDhtPartitionDemander {
             throw new IgniteCheckedException("Failed to cache rebalanced entry 
(will stop rebalancing) [local=" +
                 ctx.localNode() + ", node=" + from.id() + ", key=" + 
entry.key() + ", part=" + p + ']', e);
         }
-        finally {
-            ctx.database().checkpointReadUnlock();
-        }
 
         return true;
     }

Reply via email to