# IGNITE-54-55 Refactoring removeAll().
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/58f9a9e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/58f9a9e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/58f9a9e1 Branch: refs/heads/ignite-164v2 Commit: 58f9a9e1fe51bbefb37513fc3a2b3cb02a2e0c0b Parents: f9fe11e Author: sevdokimov <[email protected]> Authored: Thu Feb 5 16:10:19 2015 +0300 Committer: sevdokimov <[email protected]> Committed: Thu Feb 5 16:10:19 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 138 ------------------- .../GridDistributedCacheAdapter.java | 133 ++++++++++++++++++ .../processors/cache/local/GridLocalCache.java | 6 + 3 files changed, 139 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58f9a9e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 98bb46c..631da77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3388,35 +3388,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public void removeAll() throws IgniteCheckedException { - try { - long topVer; - - do { - topVer = ctx.affinity().affinityTopologyVersion(); - - // Send job to all nodes. - Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes(); - - if (!nodes.isEmpty()) { - ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer), nodes, true).get(); - } - } while (ctx.affinity().affinityTopologyVersion() > topVer); - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]"); - } - catch (ComputeTaskTimeoutCheckedException e) { - U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider increasing " + - "'networkTimeout' configuration property) [cacheName=" + name() + "]"); - - throw e; - } - } - - /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(final IgnitePredicate<CacheEntry<K, V>>... filter) { ctx.denyOnLocalRead(); @@ -5198,115 +5169,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** - * Internal callable which performs remove all primary key mappings - * operation on a cache with the given name. - */ - @GridInternal - private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { - /** */ - private static final long REMOVE_ALL_BATCH_SIZE = 100L; - - /** */ - private static final long serialVersionUID = 0L; - - /** Cache name. */ - private String cacheName; - - /** Topology version. */ - private long topVer; - - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** - * Empty constructor for serialization. - */ - public GlobalRemoveAllCallable() { - // No-op. - } - - /** - * @param cacheName Cache name. - * @param topVer Topology version. - */ - private GlobalRemoveAllCallable(String cacheName, long topVer) { - this.cacheName = cacheName; - this.topVer = topVer; - } - - /** - * {@inheritDoc} - */ - @Override public Object call() throws Exception { - Collection<K> keys = new ArrayList<>(); - - final IgniteKernal grid = (IgniteKernal) ignite; - - final GridCache<K,V> cache = grid.cachex(cacheName); - - GridCacheAdapter<K, V> cacheAdapter = grid.context().cache().internalCache(cacheName); - - final GridCacheContext<K, V> ctx = cacheAdapter.context(); - - if (ctx.affinity().affinityTopologyVersion() != topVer) - return null; // Ignore this remove request because remove request will be sent again. - - if (cacheAdapter instanceof GridNearCacheAdapter) - cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht(); - - if (cacheAdapter instanceof GridDhtCacheAdapter) { - GridDhtCacheAdapter<K, V> dht = (GridDhtCacheAdapter)cacheAdapter; - - for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions()) { - if (locPart.primary(topVer)) { - for (GridDhtCacheEntry<K, V> o : locPart.entries()) { - keys.add(o.key()); - - if (keys.size() >= REMOVE_ALL_BATCH_SIZE) { - cache.removeAll(keys); - - keys.clear(); - } - } - } - } - } - else { - assert cache != null; - - for (K k : cache.keySet()) { - if (ctx.affinity().primary(ctx.localNode(), k, topVer)) - keys.add(k); - - if (keys.size() >= REMOVE_ALL_BATCH_SIZE) { - cache.removeAll(keys); - - keys.clear(); - } - } - } - - if (!keys.isEmpty()) - cache.removeAll(keys); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeLong(topVer); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - topVer = in.readLong(); - } - } - - /** * Internal callable which performs {@link org.apache.ignite.cache.CacheProjection#clearLocally()} * operation on a cache with the given name. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58f9a9e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index d88d2e8..d6e846d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -17,18 +17,29 @@ package org.apache.ignite.internal.processors.cache.distributed; +import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import java.io.*; import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.GridClosureCallMode.*; /** * Distributed cache implementation. @@ -123,7 +134,129 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } /** {@inheritDoc} */ + @Override public void removeAll() throws IgniteCheckedException { + try { + long topVer; + + do { + topVer = ctx.affinity().affinityTopologyVersion(); + + // Send job to all nodes. + Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes(); + + if (!nodes.isEmpty()) { + ctx.closures().callAsyncNoFailover(BROADCAST, + new GlobalRemoveAllCallable<>(name(), topVer), nodes, true).get(); + } + } while (ctx.affinity().affinityTopologyVersion() > topVer); + } + catch (ClusterGroupEmptyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]"); + } + catch (ComputeTaskTimeoutCheckedException e) { + U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider increasing " + + "'networkTimeout' configuration property) [cacheName=" + name() + "]"); + + throw e; + } + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDistributedCacheAdapter.class, this, "super", super.toString()); } + + /** + * Internal callable which performs remove all primary key mappings + * operation on a cache with the given name. + */ + @GridInternal + private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { + /** */ + private static final long REMOVE_ALL_BATCH_SIZE = 100L; + + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private String cacheName; + + /** Topology version. */ + private long topVer; + + /** Injected grid instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * Empty constructor for serialization. + */ + public GlobalRemoveAllCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param topVer Topology version. + */ + private GlobalRemoveAllCallable(String cacheName, long topVer) { + this.cacheName = cacheName; + this.topVer = topVer; + } + + /** + * {@inheritDoc} + */ + @Override public Object call() throws Exception { + final IgniteKernal grid = (IgniteKernal) ignite; + + final GridCache<K,V> cache = grid.cachex(cacheName); + + GridCacheAdapter<K, V> cacheAdapter = grid.context().cache().internalCache(cacheName); + + final GridCacheContext<K, V> ctx = cacheAdapter.context(); + + if (ctx.affinity().affinityTopologyVersion() != topVer) + return null; // Ignore this remove request because remove request will be sent again. + + if (cacheAdapter instanceof GridNearCacheAdapter) + cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht(); + + GridDhtCacheAdapter<K, V> dht = (GridDhtCacheAdapter)cacheAdapter; + + Collection<K> keys = new ArrayList<>(); + + for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions()) { + if (!locPart.isEmpty() && locPart.primary(topVer)) { + for (GridDhtCacheEntry<K, V> o : locPart.entries()) { + keys.add(o.key()); + + if (keys.size() >= REMOVE_ALL_BATCH_SIZE) { + cache.removeAll(keys); + + keys.clear(); + } + } + } + } + + if (!keys.isEmpty()) + cache.removeAll(keys); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + out.writeLong(topVer); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + topVer = in.readLong(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58f9a9e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index c09b1cc..b4bc3a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -191,6 +191,12 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void removeAll() throws IgniteCheckedException { + removeAll(keySet()); + } + + /** {@inheritDoc} */ @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, GridCacheVersion ver) { assert false : "Should not be called"; }
