IGNITE-2948 - Optimize usage of GridCacheConcurrentMap
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b7470b3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b7470b3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b7470b3 Branch: refs/heads/ignite-2788 Commit: 0b7470b381cf6571577bef7a2dd3f1833dce61e5 Parents: bc98fa5 Author: Yakov Zhdanov <[email protected]> Authored: Tue Apr 26 15:07:28 2016 +0300 Committer: shtykh_roman <[email protected]> Committed: Fri May 13 16:11:15 2016 +0900 ---------------------------------------------------------------------- .../internal/benchmarks/model/IntValue.java | 19 +- .../internal/binary/BinaryObjectImpl.java | 14 + .../processors/cache/GridCacheAdapter.java | 448 ++-- .../cache/GridCacheAffinityManager.java | 10 +- .../cache/GridCacheClearAllRunnable.java | 2 +- .../cache/GridCacheConcurrentMap.java | 1996 +----------------- .../cache/GridCacheConcurrentMapImpl.java | 344 +++ .../processors/cache/GridCacheContext.java | 56 +- .../cache/GridCacheDeploymentManager.java | 2 +- .../processors/cache/GridCacheEntrySet.java | 113 - .../cache/GridCacheEvictionManager.java | 2 +- .../processors/cache/GridCacheKeySet.java | 104 - .../processors/cache/GridCacheMapEntry.java | 22 +- .../processors/cache/GridCacheProcessor.java | 1 - .../processors/cache/GridCacheProxyImpl.java | 2 +- .../processors/cache/GridCacheUtils.java | 1 - .../processors/cache/GridNoStorageCacheMap.java | 107 + .../processors/cache/IgniteInternalCache.java | 2 +- .../processors/cache/KeyCacheObject.java | 11 + .../processors/cache/KeyCacheObjectImpl.java | 32 +- .../binary/CacheObjectBinaryProcessorImpl.java | 24 +- .../GridDistributedCacheAdapter.java | 4 +- .../distributed/GridDistributedLockRequest.java | 19 +- .../GridDistributedTxRemoteAdapter.java | 1 - .../GridDistributedUnlockRequest.java | 16 +- .../dht/GridCachePartitionedConcurrentMap.java | 191 ++ .../dht/GridClientPartitionTopology.java | 7 - .../distributed/dht/GridDhtCacheAdapter.java | 77 +- .../distributed/dht/GridDhtCacheEntry.java | 25 +- .../distributed/dht/GridDhtLocalPartition.java | 152 +- .../distributed/dht/GridDhtLockRequest.java | 16 +- .../dht/GridDhtPartitionTopology.java | 7 - .../dht/GridDhtPartitionTopologyImpl.java | 281 ++- .../distributed/dht/GridDhtUnlockRequest.java | 15 +- .../distributed/dht/GridNoStorageCacheMap.java | 122 -- .../dht/GridPartitionedGetFuture.java | 2 +- .../dht/GridPartitionedSingleGetFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 75 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 22 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 6 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 31 +- .../dht/colocated/GridDhtColocatedCache.java | 13 +- .../dht/preloader/GridDhtPartitionDemander.java | 2 +- .../dht/preloader/GridDhtPartitionSupplier.java | 8 +- .../distributed/near/GridNearCacheAdapter.java | 94 +- .../distributed/near/GridNearGetFuture.java | 2 +- .../distributed/near/GridNearGetRequest.java | 45 +- .../distributed/near/GridNearLockRequest.java | 16 +- .../near/GridNearSingleGetRequest.java | 22 +- .../distributed/near/GridNearUnlockRequest.java | 21 +- .../processors/cache/local/GridLocalCache.java | 7 +- .../local/atomic/GridLocalAtomicCache.java | 24 +- .../cache/transactions/IgniteInternalTx.java | 1 - .../cache/transactions/IgniteTxAdapter.java | 1 - .../cache/transactions/IgniteTxEntry.java | 33 +- .../cacheobject/IgniteCacheObjectProcessor.java | 10 + .../IgniteCacheObjectProcessorImpl.java | 36 +- .../GridCacheAtomicReferenceImpl.java | 2 +- .../internal/util/PartitionedReadOnlySet.java | 71 + .../util/StripedCompositeReadWriteLock.java | 10 + .../ignite/internal/util/lang/GridFunc.java | 3 +- .../ignite/internal/visor/cache/VisorCache.java | 6 +- .../GridCachePreloadingEvictionsSelfTest.java | 4 +- .../cache/GridCacheTtlManagerSelfTest.java | 2 +- ...idCacheValueConsistencyAbstractSelfTest.java | 3 +- .../IgniteDynamicClientCacheStartSelfTest.java | 10 - .../IgniteTxStoreExceptionAbstractSelfTest.java | 8 +- .../GridCacheBinaryObjectsAbstractSelfTest.java | 2 +- ...actQueueFailoverDataConsistencySelfTest.java | 2 +- .../GridCacheQueueCleanupSelfTest.java | 4 +- .../GridCacheSequenceApiSelfAbstractTest.java | 37 - .../GridCacheSetAbstractSelfTest.java | 5 +- .../GridCacheSetFailoverAbstractSelfTest.java | 8 +- .../IgniteDataStructureUniqueNameTest.java | 3 +- .../IgnitePartitionedQueueNoBackupsTest.java | 6 +- .../IgnitePartitionedSetNoBackupsSelfTest.java | 6 +- .../distributed/dht/GridCacheDhtTestUtils.java | 9 +- .../near/GridCacheNearOneNodeSelfTest.java | 4 +- .../processors/igfs/IgfsAbstractSelfTest.java | 10 +- 79 files changed, 1958 insertions(+), 2975 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java ---------------------------------------------------------------------- diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java index 0a97e36..45fbe79 100644 --- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java @@ -85,7 +85,24 @@ public class IntValue implements Externalizable, Binarylizable { } /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + IntValue value = (IntValue)o; + + return val == value.val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + + /** {@inheritDoc} */ @Override public String toString() { return "Value [id=" + val + ']'; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java index 0997d6f..fa10de3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java @@ -72,6 +72,10 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern @GridDirectTransient private boolean detachAllowed; + /** */ + @GridDirectTransient + private int part = -1; + /** * For {@link Externalizable}. */ @@ -94,6 +98,16 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern } /** {@inheritDoc} */ + @Override public int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public void partition(int part) { + this.part = part; + } + + /** {@inheritDoc} */ @Override public byte cacheObjectType() { return TYPE_BINARY; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d807e26..8c1a750 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -23,8 +23,8 @@ import java.io.InvalidObjectException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.ObjectStreamException; +import java.util.AbstractSet; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -101,13 +101,11 @@ import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFi import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; -import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; -import org.apache.ignite.internal.util.lang.GridTriple; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.C2; @@ -132,7 +130,6 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; -import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.resources.IgniteInstanceResource; @@ -146,8 +143,6 @@ import org.jsr166.LongAdder8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_KEY_VALIDATION_DISABLED; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT; -import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_CREATED; -import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_DESTROYED; import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; @@ -304,7 +299,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ @SuppressWarnings("OverriddenMethodCallDuringObjectConstruction") protected GridCacheAdapter(GridCacheContext<K, V> ctx, int startSize) { - this(ctx, new GridCacheConcurrentMap(ctx, startSize, null)); + this(ctx, null); } /** @@ -312,7 +307,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param map Concurrent map. */ @SuppressWarnings({"OverriddenMethodCallDuringObjectConstruction", "deprecation"}) - protected GridCacheAdapter(final GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { + protected GridCacheAdapter(final GridCacheContext<K, V> ctx, @Nullable GridCacheConcurrentMap map) { assert ctx != null; this.ctx = ctx; @@ -393,6 +388,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Increments map public size. + * @param e Map entry. + */ + public void incrementSize(GridCacheMapEntry e) { + map.incrementPublicSize(e); + } + + /** + * Decrements map public size. + * @param e Map entry. + */ + public void decrementSize(GridCacheMapEntry e) { + map.decrementPublicSize(e); + } + + /** * @return Context. */ @Override public GridCacheContext<K, V> context() { @@ -483,7 +494,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx); } - /** {@inheritDoc} */ @Nullable @Override public ExpiryPolicy expiry() { return null; @@ -541,13 +551,25 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * @return Entry factory. + */ + protected abstract GridCacheMapEntryFactory entryFactory(); + + /** * Starts this cache. Child classes should override this method * to provide custom start-up behavior. * * @throws IgniteCheckedException If start failed. */ public void start() throws IgniteCheckedException { - // No-op. + if (map == null) { + int initSize = ctx.config().getStartSize(); + + if (!isLocal()) + initSize /= ctx.affinity().partitions(); + + map = new GridCacheConcurrentMapImpl(ctx, entryFactory(), initSize); + } } /** @@ -687,7 +709,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V modes.backup = true; if (modes.heap) - its.add(iterator(map.entries0().iterator(), !ctx.keepBinary())); + its.add(iterator(map.entries().iterator(), !ctx.keepBinary())); } else if (modes.heap) { if (modes.near && ctx.isNear()) @@ -946,74 +968,28 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V GridCacheMapEntry cur = map.getEntry(key); if (cur == null || cur.obsolete()) { - GridTriple<GridCacheMapEntry> t = map.putEntryIfObsoleteOrAbsent( + cur = map.putEntryIfObsoleteOrAbsent( topVer, key, null, - create); - - cur = t.get1(); - - GridCacheEntryEx created = t.get2(); - GridCacheEntryEx doomed = t.get3(); - - if (doomed != null && ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) - // Event notification. - ctx.events().addEvent(doomed.partition(), - doomed.key(), - locNodeId, - (IgniteUuid)null, - null, - EVT_CACHE_ENTRY_DESTROYED, - null, - false, - null, - false, - null, - null, - null, - true); - - if (created != null) { - // Event notification. - if (ctx.events().isRecordable(EVT_CACHE_ENTRY_CREATED)) - ctx.events().addEvent(created.partition(), - created.key(), - locNodeId, - (IgniteUuid)null, - null, - EVT_CACHE_ENTRY_CREATED, - null, - false, - null, - false, - null, - null, - null, - true); - - if (touch) - ctx.evicts().touch( - cur, - topVer); - } + create, touch); } return cur; } /** - * @return Set of internal cached entry representations, excluding {@link GridCacheInternal} keys. + * @return Set of internal cached entry representations. */ - public Set<GridCacheEntryEx> entries() { - return map.entries0(); + public Iterable<? extends GridCacheEntryEx> entries() { + return allEntries(); } /** - * @return Set of internal cached entry representations, including {@link GridCacheInternal} keys. + * @return Set of internal cached entry representations. */ - public Set<GridCacheEntryEx> allEntries() { - return map.allEntries0(); + public Iterable<? extends GridCacheEntryEx> allEntries() { + return map.entries(); } /** {@inheritDoc} */ @@ -1022,8 +998,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { - return map.entriesx(filter); + @Override public Set<Cache.Entry<K, V>> entrySetx(final CacheEntryPredicate... filter) { + return new EntrySet(map.entrySet(filter)); } /** {@inheritDoc} */ @@ -1033,22 +1009,57 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<K> keySet() { - return map.keySet(); + return new KeySet(map.entrySet()); } /** {@inheritDoc} */ @Override public Set<K> keySetx() { - return map.keySetx(); + return keySet(); } /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { - return map.keySet(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode())); + return new KeySet(map.entrySet(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()))); } /** {@inheritDoc} */ - @Override public Collection<V> values() { - return map.values(); + @Override public Iterable<V> values() { + return values((CacheEntryPredicate[])null); + } + + /** + * Collection of values cached on this node. You cannot modify this collection. + * <p> + * Iterator over this collection will not fail if collection was + * concurrently updated by another thread. This means that iterator may or + * may not return latest values depending on whether they were added before + * or after current iterator position. + * <p> + * NOTE: this operation is not distributed and returns only the values cached on this node. + * + * @param filter Filters. + * @return Collection of cached values. + */ + public Iterable<V> values(final CacheEntryPredicate... filter) { + return new Iterable<V>() { + @Override public Iterator<V> iterator() { + return new Iterator<V>() { + private final Iterator<? extends GridCacheEntryEx> it = entries().iterator(); + + @Override public boolean hasNext() { + return it.hasNext(); + } + + @Override public V next() { + return (V) it.next().wrap().getValue(); + } + + @Override public void remove() { + throw new UnsupportedOperationException("remove"); + } + }; + } + }; } /** @@ -1058,21 +1069,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V public void removeIfObsolete(KeyCacheObject key) { assert key != null; - GridCacheEntryEx entry = map.removeEntryIfObsolete(key); + GridCacheMapEntry entry = map.getEntry(key); - if (entry != null) { - assert entry.obsolete() : "Removed non-obsolete entry: " + entry; - - if (log.isDebugEnabled()) - log.debug("Removed entry from cache: " + entry); - - if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) - // Event notification. - ctx.events().addEvent(entry.partition(), entry.key(), locNodeId, (IgniteUuid)null, null, - EVT_CACHE_ENTRY_DESTROYED, null, false, null, false, null, null, null, false); - } - else if (log.isDebugEnabled()) - log.debug("Remove will not be done for key (obsolete entry got replaced or removed): " + key); + if (entry.obsolete()) + removeEntry(entry); } /** @@ -1088,7 +1088,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V int keySize = size(); int cnt = Math.min(keySize / CLEAR_ALL_SPLIT_THRESHOLD + (keySize % CLEAR_ALL_SPLIT_THRESHOLD != 0 ? 1 : 0), - Runtime.getRuntime().availableProcessors()); + Runtime.getRuntime().availableProcessors()); if (cnt == 0) cnt = 1; // Still perform cleanup since there could be entries in swap. @@ -1256,7 +1256,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param entry Removes entry from cache if currently mapped value is the same as passed. */ public void removeEntry(GridCacheEntryEx entry) { - map.removeEntry(entry); + boolean removed = map.removeEntry(entry); + + if (log.isDebugEnabled()) { + if (removed) + log.debug("Removed entry from cache: " + entry); + else + log.debug("Remove will not be done for key (entry got replaced or removed): " + entry.key()); + } } /** @@ -1399,7 +1406,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V T2<V, GridCacheVersion> t = (T2<V, GridCacheVersion>)get(key, !ctx.keepBinary(), true); - CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()): null; + CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null; if (ctx.config().getInterceptor() != null) { V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); @@ -1424,7 +1431,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<V> fut = getAsync(key, !ctx.keepBinary(), false); if (ctx.config().getInterceptor() != null) - fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() { + fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() { @Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException { return (V)ctx.config().getInterceptor().onGet(key, f.get()); } @@ -1451,20 +1458,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain( new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() { - @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f) - throws IgniteCheckedException { - T2<V, GridCacheVersion> t = f.get(); + @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f) + throws IgniteCheckedException { + T2<V, GridCacheVersion> t = f.get(); - CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null; - if (intercept) { - V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); + CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null; + if (intercept) { + V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); - return new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null); + return new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null); + } + else + return val; } - else - return val; - } - }); + }); if (statsEnabled) fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start)); @@ -1516,7 +1523,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return res; } - /** {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys) { A.notNull(keys, "keys"); @@ -1541,7 +1547,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync( + @Override public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync( @Nullable final Collection<? extends K> keys) { A.notNull(keys, "keys"); @@ -1850,7 +1856,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final AffinityTopologyVersion topVer = tx == null ? (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : - tx.topologyVersion(); + tx.topologyVersion(); int keysSize = keys.size(); @@ -2091,8 +2097,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @throws IgniteCheckedException If failed. */ @Nullable public V getAndPut(final K key, final V val, @Nullable final CacheEntryPredicate filter) - throws IgniteCheckedException - { + throws IgniteCheckedException { boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; @@ -2351,7 +2356,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() { @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = - Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); + Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); return tx.invokeAsync(ctx, readyTopVer, invokeMap, args); } @@ -2578,7 +2583,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } }); - if(statsEnabled) + if (statsEnabled) fut.listen(new UpdatePutTimeStatClosure<V>(metrics0(), start)); return fut; @@ -2912,11 +2917,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V assert ctx.isLocal(); for (Iterator<KeyCacheObject> it = ctx.swap().offHeapKeyIterator(true, true, AffinityTopologyVersion.NONE); - it.hasNext(); ) + it.hasNext(); ) remove((K)it.next()); for (Iterator<KeyCacheObject> it = ctx.swap().swapKeyIterator(true, true, AffinityTopologyVersion.NONE); - it.hasNext(); ) + it.hasNext(); ) remove((K)it.next()); removeAll(keySet()); @@ -3436,7 +3441,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ttl == CU.TTL_ZERO) return; - loadEntry(key, val, ver0, (IgniteBiPredicate<Object, Object>) p, topVer, replicate, ttl); + loadEntry(key, val, ver0, (IgniteBiPredicate<Object, Object>)p, topVer, replicate, ttl); } }, args); } @@ -3490,8 +3495,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> localLoadCacheAsync(final IgniteBiPredicate<K, V> p, - final Object[] args) - { + final Object[] args) { return ctx.closures().callLocalSafe( ctx.projectSafe(new Callable<Object>() { @Nullable @Override public Object call() throws IgniteCheckedException { @@ -3596,8 +3600,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ public void localLoad(Collection<? extends K> keys, @Nullable ExpiryPolicy plc) - throws IgniteCheckedException - { + throws IgniteCheckedException { final boolean replicate = ctx.isDrEnabled(); final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); @@ -4574,14 +4577,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Entry set. */ public Set<Cache.Entry<K, V>> entrySet(@Nullable CacheEntryPredicate... filter) { - return map.entries(filter); + return entrySetx(filter); } /** * @return Primary entry set. */ public Set<Cache.Entry<K, V>> primaryEntrySet() { - return map.entries(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode())); + return new EntrySet(map.entrySet(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()))); } /** @@ -4612,15 +4615,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V String taskName, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { - return getAsync(key, - !ctx.config().isReadFromBackup(), - /*skip tx*/false, - null, - taskName, - deserializeBinary, - false, - /*can remap*/true, - needVer).get(); + try { + return getAsync(key, + !ctx.config().isReadFromBackup(), + /*skip tx*/false, + null, + taskName, + deserializeBinary, + false, + /*can remap*/true, + needVer).get(); + } + catch (IgniteException e) { + if (e.getCause(IgniteCheckedException.class) != null) + throw e.getCause(IgniteCheckedException.class); + else + throw e; + } } /** @@ -4741,7 +4752,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param deserializeBinary Deserialize binary flag. * @return Public API iterator. */ - protected Iterator<Cache.Entry<K, V>> iterator(final Iterator<GridCacheEntryEx> it, + protected Iterator<Cache.Entry<K, V>> iterator(final Iterator<? extends GridCacheEntryEx> it, final boolean deserializeBinary) { return new Iterator<Cache.Entry<K, V>>() { { @@ -4807,8 +4818,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ @Nullable private Cache.Entry<K, V> toCacheEntry(GridCacheEntryEx entry, boolean deserializeBinary) - throws IgniteCheckedException, GridCacheEntryRemovedException - { + throws IgniteCheckedException, GridCacheEntryRemovedException { CacheObject val = entry.innerGet( null, null, @@ -5497,7 +5507,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param loadArgs Arguments. * @param plc Policy. */ - private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate<K, V> p, Object[] loadArgs, + private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate<K, V> p, + Object[] loadArgs, ExpiryPolicy plc) { super(cacheName, topVer); @@ -5710,7 +5721,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * */ - static class LoadKeysCallable<K, V> implements IgniteCallable<Void>, Externalizable{ + static class LoadKeysCallable<K, V> implements IgniteCallable<Void>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -5831,8 +5842,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public void applyx(KeyCacheObject key, Object val, GridCacheVersion ver) - throws IgniteCheckedException - { + throws IgniteCheckedException { assert ver != null; if (p != null && !p.apply(key.<K>value(ctx.cacheObjectContext(), false), (V)val)) @@ -5914,8 +5924,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V private LoadCacheClosure(String cacheName, IgniteBiPredicate<K, V> p, Object[] args, - @Nullable ExpiryPolicy plc) - { + @Nullable ExpiryPolicy plc) { this.cacheName = cacheName; this.p = p; this.args = args; @@ -6380,4 +6389,173 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return null; } } + + /** + * Iterator implementation for KeySet. + */ + private final class KeySetIterator implements Iterator<K> { + /** Internal map entry iterator. */ + private final Iterator<GridCacheMapEntry> internalIterator; + + /** Keep binary flag. */ + private final boolean keepBinary; + + /** Current entry. */ + private GridCacheMapEntry current; + + /** + * Constructor. + * @param internalIterator Internal iterator. + * @param keepBinary Keep binary flag. + */ + private KeySetIterator(Iterator<GridCacheMapEntry> internalIterator, boolean keepBinary) { + this.internalIterator = internalIterator; + this.keepBinary = keepBinary; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return internalIterator.hasNext(); + } + + /** {@inheritDoc} */ + @Override public K next() { + current = internalIterator.next(); + + return (K)ctx.unwrapBinaryIfNeeded(current.key(), keepBinary, true); + } + + /** {@inheritDoc} */ + @Override public void remove() { + if (current == null) + throw new IllegalStateException(); + + try { + GridCacheAdapter.this.getAndRemove((K)current.key()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + current = null; + } + } + + /** + * A wrapper over internal map that provides set semantics and constant-time contains() check. + */ + private final class KeySet extends AbstractSet<K> { + /** Internal entry set. */ + private final Set<GridCacheMapEntry> internalSet; + + /** Keep binary flag. */ + private final boolean keepBinary; + + /** + * Constructor + * @param internalSet Internal set. + */ + private KeySet(Set<GridCacheMapEntry> internalSet) { + this.internalSet = internalSet; + + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + keepBinary = opCtx != null && opCtx.isKeepBinary(); + } + + /** {@inheritDoc} */ + @Override public Iterator<K> iterator() { + return new KeySetIterator(internalSet.iterator(), keepBinary); + } + + /** {@inheritDoc} */ + @Override public int size() { + return F.size(iterator()); + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + GridCacheMapEntry entry = map.getEntry(ctx.toCacheKeyObject(o)); + + return entry != null && internalSet.contains(entry); + } + } + + /** + * Iterator implementation for EntrySet. + */ + private final class EntryIterator implements Iterator<Cache.Entry<K, V>> { + + /** Internal iterator. */ + private final Iterator<GridCacheMapEntry> internalIterator; + + /** Current entry. */ + private GridCacheMapEntry current; + + /** + * Constructor. + * @param internalIterator Internal iterator. + */ + private EntryIterator(Iterator<GridCacheMapEntry> internalIterator) { + this.internalIterator = internalIterator; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return internalIterator.hasNext(); + } + + /** {@inheritDoc} */ + @Override public Cache.Entry<K, V> next() { + current = internalIterator.next(); + + return current.wrapLazyValue(); + } + + /** {@inheritDoc} */ + @Override public void remove() { + if (current == null) + throw new IllegalStateException(); + + try { + GridCacheAdapter.this.getAndRemove((K)current.wrapLazyValue().getKey()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + current = null; + } + } + + /** + * A wrapper over internal map that provides set semantics and constant-time contains() check. + */ + private final class EntrySet extends AbstractSet<Cache.Entry<K, V>> { + + /** Internal set. */ + private final Set<GridCacheMapEntry> internalSet; + + /** Constructor. */ + private EntrySet(Set<GridCacheMapEntry> internalSet) { + this.internalSet = internalSet; + } + + /** {@inheritDoc} */ + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return new EntryIterator(internalSet.iterator()); + } + + /** {@inheritDoc} */ + @Override public int size() { + return F.size(iterator()); + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + GridCacheMapEntry entry = map.getEntry(ctx.toCacheKeyObject(o)); + + return entry != null && internalSet.contains(entry); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index f1767e0..5e843dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -197,10 +197,18 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { public int partition(Object key) { GridAffinityAssignmentCache aff0 = aff; + if (key instanceof KeyCacheObject && ((KeyCacheObject)key).partition() != -1) + return ((KeyCacheObject)key).partition(); + if (aff0 == null) throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); - return affFunction.partition(affinityKey(key)); + int p = affFunction.partition(affinityKey(key)); + + if (key instanceof KeyCacheObject) + ((KeyCacheObject)key).partition(p); + + return p; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java index ffce82d..4f97e7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java @@ -79,7 +79,7 @@ public class GridCacheClearAllRunnable<K, V> implements Runnable { /** {@inheritDoc} */ @Override public void run() { - Iterator<GridCacheEntryEx> iter = cache.map().stripedEntryIterator(id, totalCnt); + Iterator<? extends GridCacheEntryEx> iter = cache.entries().iterator(); while (iter.hasNext()) clearEntry(iter.next());
