Repository: incubator-ignite Updated Branches: refs/heads/ignite-57 20f93d6c8 -> ac579f808
Ignite-54-55 Implemented basic removeAll() Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bcff8d8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bcff8d8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bcff8d8a Branch: refs/heads/ignite-57 Commit: bcff8d8a87509f49ed4065ad54f68822e7eba3fa Parents: 8970463 Author: Anton Vinogradov <[email protected]> Authored: Sun Jan 25 17:11:03 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Sun Jan 25 17:11:03 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/cache/CacheProjection.java | 17 +++ .../processors/cache/GridCacheAdapter.java | 134 +++++++++++++++++-- .../cache/GridCacheProjectionImpl.java | 5 + .../processors/cache/GridCacheProxyImpl.java | 12 ++ .../processors/cache/IgniteCacheProxy.java | 19 --- .../dht/atomic/GridDhtAtomicCache.java | 3 +- .../dataload/GridDataLoadCacheUpdaters.java | 12 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 19 +++ 8 files changed, 183 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java index 13f389d..7dd1f3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java @@ -1049,6 +1049,23 @@ public interface CacheProjection<K, V> extends Iterable<CacheEntry<K, V>> { public Set<K> keySet(); /** + * Set of keys cached on this node. You can remove elements from this set, but you cannot add elements + * to this set. All removal operation will be reflected on the cache itself. + * <p> + * Iterator over this set will not fail if set was concurrently updated + * by another thread. This means that iterator may or may not return latest + * keys depending on whether they were added before or after current + * iterator position. + * <p> + * NOTE: this operation is not distributed and returns only the keys cached on this node. + * + * @param filter Optional filter to check prior to getting key form cache. Note + * that filter is checked atomically together with get operation. + * @return Key set for this cache projection. + */ + public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter); + + /** * Set of keys for which this node is primary. * This set is dynamic and may change with grid topology changes. * Note that this set will contain mappings for all keys, even if their values are http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3c0ed75..3cf786f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** */ private static final long serialVersionUID = 0L; + /** removeAll() batch size. */ + private static final long REMOVE_ALL_BATCH_SIZE = 100L; + /** clearAll() split threshold. */ public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; @@ -3151,22 +3154,38 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public void removeAll(IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { - ctx.denyOnLocalRead(); + try { + if (F.isEmptyOrNulls(filter)) + filter = ctx.trueArray(); - if (F.isEmptyOrNulls(filter)) - filter = ctx.trueArray(); + long topVer; - final IgnitePredicate<CacheEntry<K, V>>[] p = filter; + do { + topVer = ctx.affinity().affinityTopologyVersion(); - syncOp(new SyncInOp(false) { - @Override public void inOp(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { - tx.removeAllAsync(ctx, keySet(p), null, false, null).get(); - } + // Send job to all nodes. + Collection<ClusterNode> nodes = ctx.grid().forCache(name()).nodes(); - @Override public String toString() { - return "removeAll [filter=" + Arrays.toString(p) + ']'; - } - }); + IgniteFuture<Object> fut = null; + + if (!nodes.isEmpty()) + fut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalRemoveAllCallable<>(name(), topVer, REMOVE_ALL_BATCH_SIZE, filter), nodes, true); + + if (fut != null) + fut.get(); + + } while (ctx.affinity().affinityTopologyVersion() > topVer); + } + catch (ClusterGroupEmptyException ignore) { + if (log.isDebugEnabled()) + log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]"); + } + catch (ComputeTaskTimeoutException e) { + U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider increasing " + + "'networkTimeout' configuration property) [cacheName=" + name() + "]"); + + throw e; + } } /** {@inheritDoc} */ @@ -4952,6 +4971,97 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** + * Internal callable which performs remove all primary key mappings + * operation on a cache with the given name. + */ + @GridInternal + private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private String cacheName; + + /** Topology version. */ + private long topVer; + + /** Remove batch size. */ + private long rmvBatchSz; + + /** Filters. */ + private IgnitePredicate<CacheEntry<K, V>>[] filter; + + /** Injected grid instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * Empty constructor for serialization. + */ + public GlobalRemoveAllCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param topVer Topology version. + * @param rmvBatchSz Remove batch size. + * @param filter Filter. + */ + private GlobalRemoveAllCallable(String cacheName, long topVer, long rmvBatchSz, IgnitePredicate<CacheEntry<K, V>> ... filter) { + this.cacheName = cacheName; + this.topVer = topVer; + this.rmvBatchSz = rmvBatchSz; + this.filter = filter; + } + + /** + * {@inheritDoc} + */ + @Override public Object call() throws Exception { + Set<K> keys = new HashSet<>(); + + final GridKernal grid = (GridKernal) ignite; + + final GridCache<K,V> cache = grid.cachex(cacheName); + + final GridCacheContext<K, V> ctx = grid.context().cache().<K, V>internalCache(cacheName).context(); + + assert cache != null; + + for (K k : cache.keySet(filter)) { + if (ctx.affinity().primary(ctx.localNode(), k, topVer)) + keys.add(k); + if (keys.size() >= rmvBatchSz) { + cache.removeAll(keys); + + keys.clear(); + } + } + cache.removeAll(keys); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + out.writeLong(topVer); + out.writeLong(rmvBatchSz); + out.writeObject(filter); + + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + topVer = in.readLong(); + rmvBatchSz = in.readLong(); + filter = (IgnitePredicate<CacheEntry<K, V>>[]) in.readObject(); + } + } + + /** * Internal callable which performs {@link org.apache.ignite.cache.CacheProjection#clearAll()} * operation on a cache with the given name. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 6e7ce4c..e969da6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -915,6 +915,11 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @Override public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + return cache.keySet(filter); + } + + /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { return cache.primaryKeySet(entryFilter(true)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 44bdc3f..1248f98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -993,6 +993,18 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ + @Override public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.keySet(filter); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 94ee239..7d4499c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -620,25 +620,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } } - /** - * @param keys Keys to remove. - */ - public void removeAll(Collection<? extends K> keys) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - delegate.removeAll(keys); - } - finally { - gate.leave(prev); - } - } - catch (IgniteCheckedException e) { - throw cacheException(e); - } - } - /** {@inheritDoc} */ @Override public void removeAll() { // TODO IGNITE-1. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 683b7b9..324b1f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -513,7 +513,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void removeAll(IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException { - removeAllAsync(filter).get(); + super.removeAll(filter); // TODO: IGNITE-?? Fix asynÑ cleanup + //removeAllAsync(filter).get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java index 34f0b88..6562a88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java @@ -100,7 +100,7 @@ public class GridDataLoadCacheUpdaters { * @param putMap Entries to put. * @throws IgniteCheckedException If failed. */ - protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Collection<K> rmvCol, + protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol, Map<K, V> putMap) throws IgniteCheckedException { assert rmvCol != null || putMap != null; @@ -154,7 +154,7 @@ public class GridDataLoadCacheUpdaters { assert !F.isEmpty(entries); Map<K, V> putAll = null; - Collection<K> rmvAll = null; + Set<K> rmvAll = null; for (Map.Entry<K, V> entry : entries) { K key = entry.getKey(); @@ -165,7 +165,7 @@ public class GridDataLoadCacheUpdaters { if (val == null) { if (rmvAll == null) - rmvAll = new ArrayList<>(); + rmvAll = new HashSet<>(); rmvAll.add(key); } @@ -195,7 +195,7 @@ public class GridDataLoadCacheUpdaters { assert !F.isEmpty(entries); Map<K, V> putAll = null; - Collection<K> rmvAll = null; + Set<K> rmvAll = null; for (Map.Entry<K, V> entry : entries) { K key = entry.getKey(); @@ -240,7 +240,7 @@ public class GridDataLoadCacheUpdaters { Map<Integer, Integer> partsCounts = new HashMap<>(); // Group by partition ID. - Map<Integer, Collection<K>> rmvPartMap = null; + Map<Integer, Set<K>> rmvPartMap = null; Map<Integer, Map<K, V>> putPartMap = null; Ignite ignite = cache.unwrap(Ignite.class); @@ -264,7 +264,7 @@ public class GridDataLoadCacheUpdaters { if (rmvPartMap == null) rmvPartMap = new HashMap<>(); - F.addIfAbsent(rmvPartMap, part, F.<K>newList()).add(key); + F.addIfAbsent(rmvPartMap, part, F.<K>newSet()).add(key); } else { if (putPartMap == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index b0b0a5a..1fefec4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -2486,6 +2486,25 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract cache().removeAll(); assert cache().isEmpty(); + long entryCount = hugeRemoveAllEntryCount(); + + for (int i = 0; i < entryCount; i++) + cache().put(String.valueOf(i), i); + + for (int i = 0; i < entryCount; i++) + assertEquals(Integer.valueOf(i), cache().get(String.valueOf(i))); + + cache().removeAll(); + + for (int i = 0; i < entryCount; i++) + assertNull(cache().get(String.valueOf(i))); + } + + /** + * Provides count on entities to be removed in removeAll() test + */ + protected long hugeRemoveAllEntryCount(){ + return 1000L; } /**
