#ignite-373: Get locks for partitions in removeAll().

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1ffe1158
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1ffe1158
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1ffe1158

Branch: refs/heads/ignite-373
Commit: 1ffe1158e95145bc7f0912d8e912403757add60c
Parents: cb09f7c
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Tue May 12 13:36:30 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Tue May 12 13:36:30 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedCacheAdapter.java            | 73 +++++++++++++-------
 1 file changed, 47 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ffe1158/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index b4417a0..29a806b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
@@ -41,6 +42,7 @@ import java.util.*;
 import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 
 /**
  * Distributed cache implementation.
@@ -142,10 +144,10 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         try {
             AffinityTopologyVersion topVer;
 
-            boolean removed;
+            boolean removedAll;
 
             do {
-                removed = true;
+                removedAll = true;
 
                 topVer = ctx.affinity().affinityTopologyVersion();
 
@@ -156,17 +158,17 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                 if (!nodes.isEmpty()) {
                     CacheOperationContext opCtx = 
ctx.operationContextPerCall();
 
-                    Collection<Object> results = 
ctx.closures().callAsyncNoFailover(BROADCAST,
+                    Collection<Boolean> results = 
ctx.closures().callAsyncNoFailover(BROADCAST,
                         Collections.singleton(new 
GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore())), 
nodes,
                         true).get();
 
-                    for (Object res : results) {
-                        if (res != null)
-                            removed = false;
+                    for (Boolean res : results) {
+                        if (res != null && !res)
+                            removedAll = false;
                     }
                 }
             }
-            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) 
!= 0 || !removed);
+            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) 
!= 0 || !removedAll);
         }
         catch (ClusterGroupEmptyCheckedException ignore) {
             if (log.isDebugEnabled())
@@ -241,7 +243,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
      * operation on a cache with the given name.
      */
     @GridInternal
-    private static class GlobalRemoveAllCallable<K,V> implements 
Callable<Object>, Externalizable {
+    private static class GlobalRemoveAllCallable<K,V> implements 
Callable<Boolean>, Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -279,11 +281,11 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         /**
          * {@inheritDoc}
          */
-        @Override public Object call() throws Exception {
+        @Override public Boolean call() throws Exception {
             GridCacheAdapter<K, V> cacheAdapter = 
((IgniteKernal)ignite).context().cache().internalCache(cacheName);
 
             if (cacheAdapter == null)
-                return new Integer(-1);
+                return false;
 
             final GridCacheContext<K, V> ctx = cacheAdapter.context();
 
@@ -293,7 +295,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
 
             try {
                 if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
-                    return new Integer(-1); // Ignore this remove request 
because remove request will be sent again.
+                    return false; // Ignore this remove request because remove 
request will be sent again.
 
                 GridDhtCacheAdapter<K, V> dht;
                 GridNearCacheAdapter<K, V> near = null;
@@ -313,24 +315,43 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
 
                     
dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
 
-                    for (GridDhtLocalPartition locPart : 
dht.topology().currentLocalPartitions()) {
-                        if (!locPart.isEmpty() && locPart.primary(topVer)) {
-                            for (GridDhtCacheEntry o : locPart.entries()) {
-                                if (!o.obsoleteOrDeleted())
-                                    dataLdr.removeDataInternal(o.key());
-                            }
+                    for (GridDhtLocalPartition locPart : 
dht.topology().localPartitions()) {
+                        if (locPart.state() == EVICTED) {
+                            assert locPart.entries().size() == 0;
+
+                            continue;
                         }
-                    }
 
-                    Iterator<KeyCacheObject> it = 
dht.context().swap().offHeapKeyIterator(true, false, topVer);
+                        if (locPart == null || locPart.state() != OWNING || 
!locPart.reserve())
+                            return false;
+
+                        try {
+                            if (!locPart.isEmpty() && locPart.primary(topVer)) 
{
+                                for (GridDhtCacheEntry o : locPart.entries()) {
+                                    if 
(!ctx.affinity().belongs(ctx.localNode(), locPart.id(), 
dht.topology().topologyVersion()))
+                                        return false;
+
+                                    if (!o.obsoleteOrDeleted())
+                                        dataLdr.removeDataInternal(o.key());
+                                }
+                            }
 
-                    while (it.hasNext())
-                        dataLdr.removeDataInternal(it.next());
+                            GridCloseableIterator<Map.Entry<byte[], 
GridCacheSwapEntry>> iter =
+                                ctx.swap().iterator(locPart.id());
 
-                    it = dht.context().swap().swapKeyIterator(true, false, 
topVer);
+                            if (iter != null) {
+                                for (Map.Entry<byte[], GridCacheSwapEntry> e : 
iter) {
+                                    if 
(!ctx.affinity().belongs(ctx.localNode(), locPart.id(), 
dht.topology().topologyVersion()))
+                                        return false;
 
-                    while (it.hasNext())
-                        dataLdr.removeDataInternal(it.next());
+                                    
dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey()));
+                                }
+                            }
+                        }
+                        finally {
+                            locPart.release();
+                        }
+                    }
                 }
 
                 if (near != null) {
@@ -347,9 +368,9 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
             }
 
             if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
-                return new Integer(-1);
+                return false;
 
-            return null;
+            return true;
         }
 
         /** {@inheritDoc} */

Reply via email to