ignite-2412 Do not call 'asyncOp' for synchronous operations (cherry picked from commit c530d47)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad785cbd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad785cbd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad785cbd Branch: refs/heads/master Commit: ad785cbd192ca3f34d62bc2155f61a74f4962102 Parents: 28dab6e Author: sboikov <[email protected]> Authored: Fri Dec 16 19:23:29 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 19 11:40:18 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 721 +++++++------------ .../processors/cache/IgniteCacheProxy.java | 8 - .../dht/atomic/GridDhtAtomicCache.java | 472 +++++++----- .../dht/colocated/GridDhtColocatedCache.java | 13 - .../distributed/near/GridNearAtomicCache.java | 65 +- .../local/atomic/GridLocalAtomicCache.java | 177 +---- ...nabledMultiNodeLongTxTimeoutFullApiTest.java | 2 +- ...lockMessageSystemPoolStarvationSelfTest.java | 14 +- 8 files changed, 597 insertions(+), 875 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index a8d9f1d..5f0b8a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -248,16 +248,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Grid configuration. */ @GridToStringExclude - protected IgniteConfiguration gridCfg; + private IgniteConfiguration gridCfg; /** Cache metrics. */ protected CacheMetricsImpl metrics; /** Cache localMxBean. */ - protected CacheMetricsMXBean localMxBean; + private CacheMetricsMXBean locMxBean; /** Cache mxBean. */ - protected CacheMetricsMXBean clusterMxBean; + private CacheMetricsMXBean clusterMxBean; /** Logger. */ protected IgniteLogger log; @@ -288,9 +288,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Asynchronous operations limit semaphore. */ private Semaphore asyncOpsSem; - /** */ - protected volatile boolean asyncToggled; - /** {@inheritDoc} */ @Override public String name() { return cacheCfg.getName(); @@ -334,7 +331,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V metrics = new CacheMetricsImpl(ctx); - localMxBean = new CacheLocalMetricsMXBeanImpl(this); + locMxBean = new CacheLocalMetricsMXBeanImpl(this); clusterMxBean = new CacheClusterMetricsMXBeanImpl(this); FileSystemConfiguration[] igfsCfgs = gridCfg.getFileSystemConfiguration(); @@ -367,18 +364,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * Toggles async flag if someone calls {@code withAsync()} - * on proxy and since that we have to properly handle all cache - * operations (sync and async) to put them in proper sequence. - * - * TODO: https://issues.apache.org/jira/browse/IGNITE-4393 - */ - void toggleAsync() { - if (!asyncToggled) - asyncToggled = true; - } - - /** * Prints memory stats. */ public void printMemoryStats() { @@ -471,49 +456,49 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V public abstract GridCachePreloader preloader(); /** {@inheritDoc} */ - @Override public Affinity<K> affinity() { + @Override public final Affinity<K> affinity() { return aff; } /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "RedundantCast"}) - @Override public <K1, V1> IgniteInternalCache<K1, V1> cache() { + @Override public final <K1, V1> IgniteInternalCache<K1, V1> cache() { return (IgniteInternalCache<K1, V1>)this; } /** {@inheritDoc} */ - @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) { + @Override public final GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) { CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false, null); return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ - @Override public boolean skipStore() { + @Override public final boolean skipStore() { return false; } /** {@inheritDoc} */ - @Override public GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) { + @Override public final GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) { CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false, null); return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ - @Override public <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() { + @Override public final <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() { CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false, null); return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx); } /** {@inheritDoc} */ - @Nullable @Override public ExpiryPolicy expiry() { + @Nullable @Override public final ExpiryPolicy expiry() { return null; } /** {@inheritDoc} */ - @Override public GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) { + @Override public final GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) { assert !CU.isUtilityCache(ctx.name()); assert !CU.isAtomicsCache(ctx.name()); assert !CU.isMarshallerCache(ctx.name()); @@ -524,14 +509,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalCache<K, V> withNoRetries() { + @Override public final IgniteInternalCache<K, V> withNoRetries() { CacheOperationContext opCtx = new CacheOperationContext(false, null, false, null, true, null); return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ - @Override public CacheConfiguration configuration() { + @Override public final CacheConfiguration configuration() { return ctx.config(); } @@ -630,7 +615,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public boolean isEmpty() { + @Override public final boolean isEmpty() { try { return localSize(CachePeekModes.ONHEAP_ONLY) == 0; } @@ -640,7 +625,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public boolean containsKey(K key) { + @Override public final boolean containsKey(K key) { try { return containsKeyAsync(key).get(); } @@ -667,7 +652,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public boolean containsKeys(Collection<? extends K> keys) { + @Override public final boolean containsKeys(Collection<? extends K> keys) { try { return containsKeysAsync(keys).get(); } @@ -677,7 +662,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> containsKeysAsync(final Collection<? extends K> keys) { + @Override public final IgniteInternalFuture<Boolean> containsKeysAsync(final Collection<? extends K> keys) { A.notNull(keys, "keys"); return getAllAsync( @@ -708,7 +693,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode[] peekModes) throws IgniteCheckedException { + @Override public final Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode[] peekModes) throws IgniteCheckedException { assert peekModes != null; ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -765,7 +750,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") - @Nullable @Override public V localPeek(K key, + @Nullable @Override public final V localPeek(K key, CachePeekMode[] peekModes, @Nullable IgniteCacheExpiryPolicy plc) throws IgniteCheckedException { @@ -907,7 +892,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * * @param ldr Class loader to undeploy. */ - public void onUndeploy(ClassLoader ldr) { + public final void onUndeploy(ClassLoader ldr) { ctx.deploy().onUndeploy(ldr, context()); } @@ -916,7 +901,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param key Entry key. * @return Entry or <tt>null</tt>. */ - @Nullable public GridCacheEntryEx peekEx(KeyCacheObject key) { + @Nullable public final GridCacheEntryEx peekEx(KeyCacheObject key) { return entry0(key, ctx.affinity().affinityTopologyVersion(), false, false); } @@ -925,7 +910,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param key Entry key. * @return Entry or <tt>null</tt>. */ - @Nullable public GridCacheEntryEx peekEx(Object key) { + @Nullable public final GridCacheEntryEx peekEx(Object key) { return entry0(ctx.toCacheKeyObject(key), ctx.affinity().affinityTopologyVersion(), false, false); } @@ -933,7 +918,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param key Entry key. * @return Entry (never {@code null}). */ - public GridCacheEntryEx entryEx(Object key) { + public final GridCacheEntryEx entryEx(Object key) { return entryEx(ctx.toCacheKeyObject(key), false); } @@ -941,7 +926,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param key Entry key. * @return Entry (never {@code null}). */ - public GridCacheEntryEx entryEx(KeyCacheObject key) { + public final GridCacheEntryEx entryEx(KeyCacheObject key) { return entryEx(key, false); } @@ -996,24 +981,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @return Set of internal cached entry representations. */ - public Iterable<? extends GridCacheEntryEx> entries() { + public final Iterable<? extends GridCacheEntryEx> entries() { return allEntries(); } /** * @return Set of internal cached entry representations. */ - public Iterable<? extends GridCacheEntryEx> allEntries() { + public final Iterable<? extends GridCacheEntryEx> allEntries() { return map.entries(); } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> entrySet() { + @Override public final Set<Cache.Entry<K, V>> entrySet() { return entrySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> entrySetx(final CacheEntryPredicate... filter) { + @Override public final Set<Cache.Entry<K, V>> entrySetx(final CacheEntryPredicate... filter) { boolean keepBinary = ctx.keepBinary(); return new EntrySet(map.entrySet(filter), keepBinary); @@ -1025,17 +1010,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public Set<K> keySet() { + @Override public final Set<K> keySet() { return new KeySet(map.entrySet()); } /** {@inheritDoc} */ - @Override public Set<K> keySetx() { + @Override public final Set<K> keySetx() { return keySet(); } /** {@inheritDoc} */ - @Override public Set<K> primaryKeySet() { + @Override public final Set<K> primaryKeySet() { return new KeySet(map.entrySet(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()))); } @@ -1057,7 +1042,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param filter Filters. * @return Collection of cached values. */ - public Iterable<V> values(final CacheEntryPredicate... filter) { + public final Iterable<V> values(final CacheEntryPredicate... filter) { return new Iterable<V>() { @Override public Iterator<V> iterator() { return new Iterator<V>() { @@ -1083,12 +1068,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * * @param key Entry key. */ - public void removeIfObsolete(KeyCacheObject key) { + public final void removeIfObsolete(KeyCacheObject key) { assert key != null; GridCacheMapEntry entry = map.getEntry(key); - if (entry.obsolete()) + if (entry != null && entry.obsolete()) removeEntry(entry); } @@ -1272,11 +1257,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param entry Removes entry from cache if currently mapped value is the same as passed. */ - public void removeEntry(GridCacheEntryEx entry) { - boolean removed = map.removeEntry(entry); + public final void removeEntry(GridCacheEntryEx entry) { + boolean rmvd = map.removeEntry(entry); if (log.isDebugEnabled()) { - if (removed) + if (rmvd) log.debug("Removed entry from cache: " + entry); else log.debug("Remove will not be done for key (entry got replaced or removed): " + entry.key()); @@ -1311,7 +1296,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public V getForcePrimary(K key) throws IgniteCheckedException { + @Override public final V getForcePrimary(K key) throws IgniteCheckedException { String taskName = ctx.kernalContext().job().currentTaskName(); return getAllAsync( @@ -1328,7 +1313,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> getForcePrimaryAsync(final K key) { + @Override public final IgniteInternalFuture<V> getForcePrimaryAsync(final K key) { String taskName = ctx.kernalContext().job().currentTaskName(); return getAllAsync( @@ -1349,7 +1334,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - public V getTopologySafe(K key) throws IgniteCheckedException { + public final V getTopologySafe(K key) throws IgniteCheckedException { String taskName = ctx.kernalContext().job().currentTaskName(); return getAllAsync( @@ -1366,12 +1351,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException { + @Nullable @Override public final Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException { return getAllOutTxAsync(keys).get(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) { + @Override public final IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) { String taskName = ctx.kernalContext().job().currentTaskName(); return getAllAsync(keys, @@ -1385,15 +1370,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V false); } - /** - * @param key Key. - * @param topVer Topology version. - * @return Entry. - */ - @Nullable protected GridCacheEntryEx entryExSafe(KeyCacheObject key, AffinityTopologyVersion topVer) { - return entryEx(key); - } - /** {@inheritDoc} */ @Nullable @Override public V get(K key) throws IgniteCheckedException { A.notNull(key, "key"); @@ -1533,14 +1509,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { + @Override public final Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { A.notNull(keys, "keys"); boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; - Map<K, V> map = getAll(keys, !ctx.keepBinary(), false); + Map<K, V> map = getAll0(keys, !ctx.keepBinary(), false); if (ctx.config().getInterceptor() != null) map = interceptGet(keys, map); @@ -1560,7 +1536,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V long start = statsEnabled ? System.nanoTime() : 0L; - Map<K, T2<V, GridCacheVersion>> map = (Map<K, T2<V, GridCacheVersion>>)getAll(keys, !ctx.keepBinary(), true); + Map<K, T2<V, GridCacheVersion>> map = (Map<K, T2<V, GridCacheVersion>>)getAll0(keys, !ctx.keepBinary(), true); Collection<CacheEntry<K, V>> res = new HashSet<>(); @@ -1875,7 +1851,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param needVer If {@code true} returns values as tuples containing value and version. * @return Future. */ - public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0( + protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0( @Nullable final Collection<KeyCacheObject> keys, final boolean readThrough, boolean checkTx, @@ -2138,7 +2114,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public V getAndPut(K key, V val) throws IgniteCheckedException { + @Override public final V getAndPut(K key, V val) throws IgniteCheckedException { return getAndPut(key, val, null); } @@ -2160,7 +2136,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKey(key); - V prevVal = syncOp(new SyncOp<V>(true) { + V prevVal = getAndPut0(key, val, filter); + + if (statsEnabled) + metrics0().addPutAndGetTimeNanos(System.nanoTime() - start); + + return prevVal; + } + + /** + * @param key Key. + * @param val Value. + * @param filter Optional filter. + * @return Previous value. + * @throws IgniteCheckedException If failed. + */ + protected V getAndPut0(final K key, final V val, @Nullable final CacheEntryPredicate filter) + throws IgniteCheckedException { + return syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return (V)tx.putAsync(ctx, null, key, val, true, filter).get().value(); } @@ -2169,15 +2162,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return "put [key=" + key + ", val=" + val + ", filter=" + filter + ']'; } }); - - if (statsEnabled) - metrics0().addPutAndGetTimeNanos(System.nanoTime() - start); - - return prevVal; } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> getAndPutAsync(K key, V val) { + @Override public final IgniteInternalFuture<V> getAndPutAsync(K key, V val) { return getAndPutAsync(key, val, null); } @@ -2187,11 +2175,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param filter Filter. * @return Put operation future. */ - public IgniteInternalFuture<V> getAndPutAsync(K key, V val, @Nullable CacheEntryPredicate filter) { + protected final IgniteInternalFuture<V> getAndPutAsync(K key, V val, @Nullable CacheEntryPredicate filter) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; + A.notNull(key, "key", val, "val"); + + if (keyCheck) + validateCacheKey(key); + IgniteInternalFuture<V> fut = getAndPutAsync0(key, val, filter); if (statsEnabled) @@ -2206,13 +2199,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param filter Optional filter. * @return Put operation future. */ - public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, - @Nullable final CacheEntryPredicate filter) { - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - + public IgniteInternalFuture<V> getAndPutAsync0(final K key, + final V val, + @Nullable final CacheEntryPredicate filter) + { return asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.putAsync(ctx, readyTopVer, key, val, true, filter) @@ -2226,7 +2216,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public boolean put(final K key, final V val) throws IgniteCheckedException { + @Override public final boolean put(final K key, final V val) throws IgniteCheckedException { return put(key, val, null); } @@ -2250,7 +2240,26 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKey(key); - Boolean stored = syncOp(new SyncOp<Boolean>(true) { + boolean stored = put0(key, val, filter); + + if (statsEnabled && stored) + metrics0().addPutTimeNanos(System.nanoTime() - start); + + return stored; + } + + /** + * @param key Key. + * @param val Value. + * @param filter Filter. + * @return {@code True} if optional filter passed and value was stored in cache, + * {@code false} otherwise. Note that this method will return {@code true} if filter is not + * specified. + * @throws IgniteCheckedException If put operation failed. + */ + protected boolean put0(final K key, final V val, final CacheEntryPredicate filter) + throws IgniteCheckedException { + Boolean res = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.putAsync(ctx, null, key, val, false, filter).get().success(); } @@ -2260,10 +2269,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } }); - if (statsEnabled) - metrics0().addPutTimeNanos(System.nanoTime() - start); + assert res != null; - return stored; + return res; } /** {@inheritDoc} */ @@ -2305,7 +2313,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public <T> EntryProcessorResult<T> invoke(@Nullable AffinityTopologyVersion topVer, + @Nullable @Override public final <T> EntryProcessorResult<T> invoke(@Nullable AffinityTopologyVersion topVer, K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { @@ -2538,7 +2546,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> putAsync(K key, V val) { + @Override public final IgniteInternalFuture<Boolean> putAsync(K key, V val) { return putAsync(key, val, null); } @@ -2548,9 +2556,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param filter Filter. * @return Put future. */ - public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) { + public final IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) { A.notNull(key, "key", val, "val"); + if (keyCheck) + validateCacheKey(key); + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -2571,9 +2582,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val, @Nullable final CacheEntryPredicate filter) { - if (keyCheck) - validateCacheKey(key); - return asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.putAsync(ctx, @@ -2598,267 +2606,82 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public V getAndPutIfAbsent(final K key, final V val) throws IgniteCheckedException { - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - return syncOp(new SyncOp<V>(true) { - @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAsync(ctx, null, key, val, true, ctx.noVal()).get().value(); - } - - @Override public String toString() { - return "putIfAbsent [key=" + key + ", val=" + val + ']'; - } - }); + @Nullable @Override public final V getAndPutIfAbsent(final K key, final V val) throws IgniteCheckedException { + return getAndPut(key, val, ctx.noVal()); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> getAndPutIfAbsentAsync(final K key, final V val) { - final boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - final long start = statsEnabled ? System.nanoTime() : 0L; - - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { - return tx.putAsync(ctx, readyTopVer, key, val, true, ctx.noVal()) - .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); - } - - @Override public String toString() { - return "putIfAbsentAsync [key=" + key + ", val=" + val + ']'; - } - }); - - if (statsEnabled) - fut.listen(new UpdatePutTimeStatClosure<V>(metrics0(), start)); - - return fut; + @Override public final IgniteInternalFuture<V> getAndPutIfAbsentAsync(final K key, final V val) { + return getAndPutAsync(key, val, ctx.noVal()); } /** {@inheritDoc} */ - @Override public boolean putIfAbsent(final K key, final V val) throws IgniteCheckedException { - boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - long start = statsEnabled ? System.nanoTime() : 0L; - - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - Boolean stored = syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAsync(ctx, null, key, val, false, ctx.noVal()).get().success(); - } - - @Override public String toString() { - return "putxIfAbsent [key=" + key + ", val=" + val + ']'; - } - }); - - if (statsEnabled && stored) - metrics0().addPutTimeNanos(System.nanoTime() - start); - - return stored; + @Override public final boolean putIfAbsent(final K key, final V val) throws IgniteCheckedException { + return put(key, val, ctx.noVal()); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> putIfAbsentAsync(final K key, final V val) { - final boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - final long start = statsEnabled ? System.nanoTime() : 0L; - - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { - return tx.putAsync(ctx, - readyTopVer, - key, - val, - false, - ctx.noVal()).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); - } - - @Override public String toString() { - return "putxIfAbsentAsync [key=" + key + ", val=" + val + ']'; - } - }); - - if (statsEnabled) - fut.listen(new UpdatePutTimeStatClosure<Boolean>(metrics0(), start)); - - return fut; + @Override public final IgniteInternalFuture<Boolean> putIfAbsentAsync(final K key, final V val) { + return putAsync(key, val, ctx.noVal()); } /** {@inheritDoc} */ - @Nullable @Override public V getAndReplace(final K key, final V val) throws IgniteCheckedException { - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - return syncOp(new SyncOp<V>(true) { - @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAsync(ctx, null, key, val, true, ctx.hasVal()).get().value(); - } - - @Override public String toString() { - return "replace [key=" + key + ", val=" + val + ']'; - } - }); + @Nullable @Override public final V getAndReplace(final K key, final V val) throws IgniteCheckedException { + return getAndPut(key, val, ctx.hasVal()); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> getAndReplaceAsync(final K key, final V val) { - final boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - final long start = statsEnabled ? System.nanoTime() : 0L; - - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { - return tx.putAsync(ctx, readyTopVer, key, val, true, ctx.hasVal()).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); - } - - @Override public String toString() { - return "replaceAsync [key=" + key + ", val=" + val + ']'; - } - }); - - if (statsEnabled) - fut.listen(new UpdatePutAndGetTimeStatClosure<V>(metrics0(), start)); - - return fut; + @Override public final IgniteInternalFuture<V> getAndReplaceAsync(final K key, final V val) { + return getAndPutAsync(key, val, ctx.hasVal()); } /** {@inheritDoc} */ - @Override public boolean replace(final K key, final V val) throws IgniteCheckedException { - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - return syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAsync(ctx, null, key, val, false, ctx.hasVal()).get().success(); - } - - @Override public String toString() { - return "replacex [key=" + key + ", val=" + val + ']'; - } - }); + @Override public final boolean replace(final K key, final V val) throws IgniteCheckedException { + return put(key, val, ctx.hasVal()); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> replaceAsync(final K key, final V val) { - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - return asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { - return tx.putAsync(ctx, readyTopVer, key, val, false, ctx.hasVal()).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); - } - - @Override public String toString() { - return "replacexAsync [key=" + key + ", val=" + val + ']'; - } - }); + @Override public final IgniteInternalFuture<Boolean> replaceAsync(final K key, final V val) { + return putAsync(key, val, ctx.hasVal()); } /** {@inheritDoc} */ - @Override public boolean replace(final K key, final V oldVal, final V newVal) throws IgniteCheckedException { - A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); - - if (keyCheck) - validateCacheKey(key); + @Override public final boolean replace(final K key, final V oldVal, final V newVal) throws IgniteCheckedException { + A.notNull(oldVal, "oldVal"); - return syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - // Register before hiding in the filter. - if (ctx.deploymentEnabled()) - ctx.deploy().registerClass(oldVal); - - return tx.putAsync(ctx, null, key, newVal, false, ctx.equalsVal(oldVal)).get() - .success(); - } - - @Override public String toString() { - return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']'; - } - }); + return put(key, newVal, ctx.equalsVal(oldVal)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> replaceAsync(final K key, final V oldVal, final V newVal) { - final boolean statsEnabled = ctx.config().isStatisticsEnabled(); + A.notNull(oldVal, "oldVal"); - final long start = statsEnabled ? System.nanoTime() : 0L; - - A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); - - if (keyCheck) - validateCacheKey(key); - - IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { - // Register before hiding in the filter. - if (ctx.deploymentEnabled()) { - try { - ctx.deploy().registerClass(oldVal); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - } - - return tx.putAsync(ctx, readyTopVer, key, newVal, false, ctx.equalsVal(oldVal)).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); - } - - @Override public String toString() { - return "replaceAsync [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']'; - } - }); - - if (statsEnabled) - fut.listen(new UpdatePutAndGetTimeStatClosure<Boolean>(metrics0(), start)); - - return fut; + return putAsync(key, newVal, ctx.equalsVal(oldVal)); } /** {@inheritDoc} */ @Override public void putAll(@Nullable final Map<? extends K, ? extends V> m) throws IgniteCheckedException { + if (F.isEmpty(m)) + return; + boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; - if (F.isEmpty(m)) - return; - if (keyCheck) validateCacheKeys(m.keySet()); + putAll0(m); + + if (statsEnabled) + metrics0().addPutTimeNanos(System.nanoTime() - start); + } + + /** + * @param m Map. + * @throws IgniteCheckedException If failed. + */ + protected void putAll0(final Map<? extends K, ? extends V> m) throws IgniteCheckedException { syncOp(new SyncInOp(m.size() == 1) { @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.putAllAsync(ctx, null, m, false).get(); @@ -2868,9 +2691,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return "putAll [map=" + m + ']'; } }); - - if (statsEnabled) - metrics0().addPutTimeNanos(System.nanoTime() - start); } /** {@inheritDoc} */ @@ -2881,6 +2701,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(m.keySet()); + return putAllAsync0(m); + } + + /** + * @param m Map. + * @return Future. + */ + protected IgniteInternalFuture<?> putAllAsync0(final Map<? extends K, ? extends V> m) { return asyncOp(new AsyncOp(m.keySet()) { @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.putAllAsync(ctx, @@ -2906,11 +2734,25 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKey(key); + V prevVal = getAndRemove0(key); + + if (statsEnabled) + metrics0().addRemoveAndGetTimeNanos(System.nanoTime() - start); + + return prevVal; + } + + /** + * @param key Key. + * @return Previous value. + * @throws IgniteCheckedException If failed. + */ + protected V getAndRemove0(final K key) throws IgniteCheckedException { final boolean keepBinary = ctx.keepBinary(); - V prevVal = syncOp(new SyncOp<V>(true) { + return syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key; + K key0 = keepBinary ? (K) ctx.toCacheKeyObject(key) : key; V ret = tx.removeAllAsync(ctx, null, @@ -2920,9 +2762,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*singleRmv*/false).get().value(); if (ctx.config().getInterceptor() != null) { - K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0; + K key = keepBinary ? (K) ctx.unwrapBinaryIfNeeded(key0, true, false) : key0; - return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2(); + return (V) ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2(); } return ret; @@ -2932,11 +2774,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return "remove [key=" + key + ']'; } }); - - if (statsEnabled) - metrics0().addRemoveAndGetTimeNanos(System.nanoTime() - start); - - return prevVal; } /** {@inheritDoc} */ @@ -2950,7 +2787,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKey(key); - IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { + IgniteInternalFuture<V> fut = getAndRemoveAsync0(key); + + if (statsEnabled) + fut.listen(new UpdateRemoveTimeStatClosure<V>(metrics0(), start)); + + return fut; + } + + /** + * @param key Key. + * @return Future. + */ + protected IgniteInternalFuture<V> getAndRemoveAsync0(final K key) { + return asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { // TODO should we invoke interceptor here? return tx.removeAllAsync(ctx, @@ -2965,11 +2815,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return "removeAsync [key=" + key + ']'; } }); - - if (statsEnabled) - fut.listen(new UpdateRemoveTimeStatClosure<V>(metrics0(), start)); - - return fut; } /** {@inheritDoc} */ @@ -3002,6 +2847,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(keys); + removeAll0(keys); + + if (statsEnabled) + metrics0().addRemoveTimeNanos(System.nanoTime() - start); + } + + /** + * @param keys Keys. + * @throws IgniteCheckedException If failed. + */ + protected void removeAll0(final Collection<? extends K> keys) throws IgniteCheckedException { syncOp(new SyncInOp(keys.size() == 1) { @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.removeAllAsync(ctx, @@ -3016,24 +2872,34 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return "removeAll [keys=" + keys + ']'; } }); - - if (statsEnabled) - metrics0().addRemoveTimeNanos(System.nanoTime() - start); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable final Collection<? extends K> keys) { + if (F.isEmpty(keys)) + return new GridFinishedFuture<Object>(); + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; - if (F.isEmpty(keys)) - return new GridFinishedFuture<Object>(); - if (keyCheck) validateCacheKeys(keys); - IgniteInternalFuture<Object> fut = asyncOp(new AsyncOp(keys) { + IgniteInternalFuture<Object> fut = removeAllAsync0(keys); + + if (statsEnabled) + fut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); + + return fut; + } + + /** + * @param keys Keys. + * @return Future. + */ + protected IgniteInternalFuture<Object> removeAllAsync0(final Collection<? extends K> keys) { + return asyncOp(new AsyncOp(keys) { @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.removeAllAsync(ctx, readyTopVer, @@ -3047,15 +2913,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return "removeAllAsync [keys=" + keys + ']'; } }); - - if (statsEnabled) - fut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); - - return fut; } /** {@inheritDoc} */ @Override public boolean remove(final K key) throws IgniteCheckedException { + return remove(key, (CacheEntryPredicate)null); + } + + /** + * @param key Key. + * @param filter Filter. + * @return {@code True} if entry was removed. + * @throws IgniteCheckedException If failed. + */ + public boolean remove(final K key, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; @@ -3065,13 +2936,27 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKey(key); - boolean rmv = syncOp(new SyncOp<Boolean>(true) { + boolean rmv = remove0(key, filter); + + if (statsEnabled && rmv) + metrics0().addRemoveTimeNanos(System.nanoTime() - start); + + return rmv; + } + + /** + * @param key Key. + * @return {@code True} if entry was removed. + * @throws IgniteCheckedException If failed. + */ + protected boolean remove0(final K key, final CacheEntryPredicate filter) throws IgniteCheckedException { + Boolean res = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.removeAllAsync(ctx, null, Collections.singletonList(key), /*retval*/false, - null, + filter, /*singleRmv*/true).get().success(); } @@ -3080,10 +2965,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } }); - if (statsEnabled && rmv) - metrics0().addRemoveTimeNanos(System.nanoTime() - start); + assert res != null; - return rmv; + return res; } /** {@inheritDoc} */ @@ -3108,7 +2992,21 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKey(key); - IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { + IgniteInternalFuture<Boolean> fut = removeAsync0(key, filter); + + if (statsEnabled) + fut.listen(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start)); + + return fut; + } + + /** + * @param key Key. + * @param filter Filter. + * @return Future. + */ + protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable final CacheEntryPredicate filter) { + return asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.removeAllAsync(ctx, readyTopVer, @@ -3123,11 +3021,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return "removeAsync [key=" + key + ", filter=" + filter + ']'; } }); - - if (statsEnabled) - fut.listen(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start)); - - return fut; } /** {@inheritDoc} */ @@ -3169,86 +3062,21 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public boolean remove(final K key, final V val) throws IgniteCheckedException { - boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - long start = statsEnabled ? System.nanoTime() : 0L; - - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - boolean rmv = syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - // Register before hiding in the filter. - if (ctx.deploymentEnabled()) - ctx.deploy().registerClass(val); - - return tx.removeAllAsync(ctx, - null, - Collections.singletonList(key), - /*retval*/false, - ctx.equalsVal(val), - /*singleRmv*/false).get().success(); - } - - @Override public String toString() { - return "remove [key=" + key + ", val=" + val + ']'; - } - }); + @Override public final boolean remove(final K key, final V val) throws IgniteCheckedException { + A.notNull(val, "val"); - if (statsEnabled && rmv) - metrics0().addRemoveTimeNanos(System.nanoTime() - start); - - return rmv; + return remove(key, ctx.equalsVal(val)); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> removeAsync(final K key, final V val) { - final boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - final long start = statsEnabled ? System.nanoTime() : 0L; - - A.notNull(key, "key", val, "val"); - - if (keyCheck) - validateCacheKey(key); - - IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { - // Register before hiding in the filter. - if (ctx.deploymentEnabled()) { - try { - ctx.deploy().registerClass(val); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - } - - return tx.removeAllAsync(ctx, - readyTopVer, - Collections.singletonList(key), - /*retval*/false, - ctx.equalsVal(val), - /*singleRmv*/false).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); - } + @Override public final IgniteInternalFuture<Boolean> removeAsync(final K key, final V val) { + A.notNull(key, "val"); - @Override public String toString() { - return "removeAsync [key=" + key + ", val=" + val + ']'; - } - }); - - if (statsEnabled) - fut.listen(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start)); - - return fut; + return removeAsync(key, ctx.equalsVal(val)); } /** {@inheritDoc} */ - @Override public CacheMetrics clusterMetrics() { + @Override public final CacheMetrics clusterMetrics() { return clusterMetrics(ctx.grid().cluster().forCacheNodes(ctx.name())); } @@ -3277,7 +3105,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public CacheMetricsMXBean localMxBean() { - return localMxBean; + return locMxBean; } /** {@inheritDoc} */ @@ -4607,9 +4435,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Failed future if waiting was interrupted. */ @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() { - if (!asyncToggled) - return null; - try { if (asyncOpsSem != null) asyncOpsSem.acquire(); @@ -4627,8 +4452,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Releases asynchronous operations permit, if limited. */ - protected void asyncOpRelease() { - if (asyncOpsSem != null && asyncToggled) + private void asyncOpRelease() { + if (asyncOpsSem != null) asyncOpsSem.release(); } @@ -4793,12 +4618,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Cached value. * @throws IgniteCheckedException If failed. */ - @Nullable public V get(K key, boolean deserializeBinary, final boolean needVer) throws IgniteCheckedException { - checkJta(); - + @Nullable public final V get(K key, boolean deserializeBinary, final boolean needVer) throws IgniteCheckedException { String taskName = ctx.kernalContext().job().currentTaskName(); - return get(key, taskName, deserializeBinary, needVer); + return get0(key, taskName, deserializeBinary, needVer); } /** @@ -4809,11 +4632,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Cached value. * @throws IgniteCheckedException If failed. */ - protected V get( + protected V get0( final K key, String taskName, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { + checkJta(); + try { return getAsync(key, !ctx.config().isReadFromBackup(), @@ -4867,7 +4692,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Map of cached values. * @throws IgniteCheckedException If read failed. */ - public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary, + protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { checkJta(); @@ -4922,7 +4747,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param key Cache key. * @throws IllegalArgumentException If validation fails. */ - protected void validateCacheKey(Object key) { + protected final void validateCacheKey(Object key) { if (keyCheck) { CU.validateCacheKey(key); @@ -4937,7 +4762,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param keys Cache keys. * @throws IgniteException If validation fails. */ - protected void validateCacheKeys(Iterable<?> keys) { + protected final void validateCacheKeys(Iterable<?> keys) { if (keys == null) return; @@ -4958,7 +4783,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param deserializeBinary Deserialize binary flag. * @return Public API iterator. */ - protected Iterator<Cache.Entry<K, V>> iterator(final Iterator<? extends GridCacheEntryEx> it, + protected final Iterator<Cache.Entry<K, V>> iterator(final Iterator<? extends GridCacheEntryEx> it, final boolean deserializeBinary) { return new Iterator<Cache.Entry<K, V>>() { { @@ -5276,7 +5101,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param plc Explicitly specified expiry policy for cache operation. * @return Expiry policy wrapper. */ - @Nullable public IgniteCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) { + @Nullable public final IgniteCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) { if (plc == null) plc = ctx.expiry(); @@ -5401,7 +5226,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param opCtx Operation context. * @return Operation future. */ - protected IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut, + private IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut, final AffinityTopologyVersion topVer, final IgniteTxLocalAdapter tx, final CacheOperationContext opCtx) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b9e6e82..f87fa1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -334,14 +334,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ - @Override public IgniteCache<K, V> withAsync() { - if (delegate instanceof GridCacheAdapter) - ((GridCacheAdapter)delegate).toggleAsync(); - - return super.withAsync(); - } - - /** {@inheritDoc} */ @Override public IgniteCache<K, V> withSkipStore() { return skipStore(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 0e60ff4..a67a903 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -139,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos; /** Pending */ - private GridDeferredAckMessageSender deferredUpdateMessageSender; + private GridDeferredAckMessageSender deferredUpdateMsgSnd; /** */ private GridNearAtomicCache<K, V> near; @@ -174,6 +174,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override protected void checkJta() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ @Override public boolean isDhtAtomic() { return true; } @@ -235,7 +240,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public void start() throws IgniteCheckedException { super.start(); - deferredUpdateMessageSender = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) { + deferredUpdateMsgSnd = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) { @Override public int getTimeout() { return DEFERRED_UPDATE_RESPONSE_TIMEOUT; } @@ -447,7 +452,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void stop() { - deferredUpdateMessageSender.stop(); + deferredUpdateMsgSnd.stop(); } /** @@ -463,7 +468,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected V get(K key, String taskName, boolean deserializeBinary, boolean needVer) + @Override protected V get0(K key, String taskName, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -540,6 +545,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer) + throws IgniteCheckedException { + return getAllAsyncInternal(keys, + !ctx.config().isReadFromBackup(), + true, + null, + ctx.kernalContext().job().currentTaskName(), + deserializeBinary, + false, + true, + needVer, + false).get(); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( @Nullable final Collection<? extends K> keys, final boolean forcePrimary, @@ -551,6 +571,43 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean canRemap, final boolean needVer ) { + return getAllAsyncInternal(keys, + forcePrimary, + skipTx, + subjId, + taskName, + deserializeBinary, + skipVals, + canRemap, + needVer, + true); + } + + /** + * @param keys Keys. + * @param forcePrimary Force primary flag. + * @param skipTx Skip tx flag. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializeBinary Deserialize binary flag. + * @param skipVals Skip values flag. + * @param canRemap Can remap flag. + * @param needVer Need version flag. + * @param asyncOp Async operation flag. + * @return Future. + */ + private IgniteInternalFuture<Map<K, V>> getAllAsyncInternal( + @Nullable final Collection<? extends K> keys, + final boolean forcePrimary, + boolean skipTx, + @Nullable UUID subjId, + final String taskName, + final boolean deserializeBinary, + final boolean skipVals, + final boolean canRemap, + final boolean needVer, + boolean asyncOp + ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); if (F.isEmpty(keys)) @@ -561,7 +618,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheOperationContext opCtx = ctx.operationContextPerCall(); - subjId = ctx.subjectIdPerCall(null, opCtx); + subjId = ctx.subjectIdPerCall(subjId, opCtx); final UUID subjId0 = subjId; @@ -569,57 +626,91 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean skipStore = opCtx != null && opCtx.skipStore(); - return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { - @Override public IgniteInternalFuture<Map<K, V>> apply() { - return getAllAsync0(ctx.cacheKeysView(keys), - forcePrimary, - subjId0, - taskName, - deserializeBinary, - expiryPlc, - skipVals, - skipStore, - canRemap, - needVer); - } - }); + if (asyncOp) { + return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { + @Override public IgniteInternalFuture<Map<K, V>> apply() { + return getAllAsync0(ctx.cacheKeysView(keys), + forcePrimary, + subjId0, + taskName, + deserializeBinary, + expiryPlc, + skipVals, + skipStore, + canRemap, + needVer); + } + }); + } + else { + return getAllAsync0(ctx.cacheKeysView(keys), + forcePrimary, + subjId0, + taskName, + deserializeBinary, + expiryPlc, + skipVals, + skipStore, + canRemap, + needVer); + } } /** {@inheritDoc} */ - @Override public V getAndPut(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { - return getAndPutAsync0(key, val, filter).get(); + @Override protected V getAndPut0(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { + return (V)update0( + key, + val, + null, + null, + true, + filter, + true, + false).get(); } /** {@inheritDoc} */ - @Override public boolean put(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException { - return putAsync(key, val, filter).get(); + @Override protected boolean put0(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException { + Boolean res = (Boolean)update0( + key, + val, + null, + null, + false, + filter, + true, + false).get(); + + assert res != null; + + return res; } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<V> getAndPutAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { - A.notNull(key, "key", val, "val"); - - return updateAsync0( + return update0( key, val, null, null, true, filter, + true, true); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { - return updateAsync0( + return update0( key, val, null, null, false, filter, + true, true); } @@ -627,84 +718,34 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { A.notNull(key, "key", val, "val"); - return (V)updateAsync0( + return (V) update0( key, val, null, null, true, null, + false, false).get(); } /** {@inheritDoc} */ - @Override public V getAndPutIfAbsent(K key, V val) throws IgniteCheckedException { - return getAndPutIfAbsentAsync(key, val).get(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> getAndPutIfAbsentAsync(K key, V val) { - A.notNull(key, "key", val, "val"); - - return getAndPutAsync(key, val, ctx.noVal()); - } - - /** {@inheritDoc} */ - @Override public boolean putIfAbsent(K key, V val) throws IgniteCheckedException { - return putIfAbsentAsync(key, val).get(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> putIfAbsentAsync(K key, V val) { - A.notNull(key, "key", val, "val"); - - return putAsync(key, val, ctx.noVal()); - } - - /** {@inheritDoc} */ - @Override public V getAndReplace(K key, V val) throws IgniteCheckedException { - return getAndReplaceAsync(key, val).get(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> getAndReplaceAsync(K key, V val) { - A.notNull(key, "key", val, "val"); - - return getAndPutAsync(key, val, ctx.hasVal()); - } - - /** {@inheritDoc} */ - @Override public boolean replace(K key, V val) throws IgniteCheckedException { - return replaceAsync(key, val).get(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V val) { - A.notNull(key, "key", val, "val"); - - return putAsync(key, val, ctx.hasVal()); - } - - /** {@inheritDoc} */ - @Override public boolean replace(K key, V oldVal, V newVal) throws IgniteCheckedException { - return replaceAsync(key, oldVal, newVal).get(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { - A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); - - return putAsync(key, newVal, ctx.equalsVal(oldVal)); - } - - /** {@inheritDoc} */ - @Override public void putAll(Map<? extends K, ? extends V> m) throws IgniteCheckedException { - putAllAsync(m).get(); + @Override protected void putAll0(Map<? extends K, ? extends V> m) throws IgniteCheckedException { + updateAll0(m, + null, + null, + null, + null, + false, + false, + true, + UPDATE, + false).get(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m) { - return updateAllAsync0(m, + @Override public IgniteInternalFuture<?> putAllAsync0(Map<? extends K, ? extends V> m) { + return updateAll0(m, null, null, null, @@ -712,7 +753,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, true, - UPDATE).chain(RET2NULL); + UPDATE, + true).chain(RET2NULL); } /** {@inheritDoc} */ @@ -725,7 +767,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) { ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); - return updateAllAsync0(null, + return updateAll0(null, null, null, conflictMap, @@ -733,57 +775,40 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, true, - UPDATE); + UPDATE, + true); } /** {@inheritDoc} */ - @Override public V getAndRemove(K key) throws IgniteCheckedException { - return getAndRemoveAsync(key).get(); + @Override public V getAndRemove0(K key) throws IgniteCheckedException { + return (V)remove0(key, true, null, false).get(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<V> getAndRemoveAsync(K key) { - A.notNull(key, "key"); - - return removeAsync0(key, true, null); + @Override public IgniteInternalFuture<V> getAndRemoveAsync0(K key) { + return remove0(key, true, null, true); } /** {@inheritDoc} */ - @Override public void removeAll(Collection<? extends K> keys) throws IgniteCheckedException { - removeAllAsync(keys).get(); + @Override protected void removeAll0(Collection<? extends K> keys) throws IgniteCheckedException { + removeAllAsync0(keys, null, false, false, false).get(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllAsync(Collection<? extends K> keys) { - A.notNull(keys, "keys"); - - return removeAllAsync0(keys, null, false, false).chain(RET2NULL); + @Override public IgniteInternalFuture<Object> removeAllAsync0(Collection<? extends K> keys) { + return removeAllAsync0(keys, null, false, false, true).chain(RET2NULL); } /** {@inheritDoc} */ - @Override public boolean remove(K key) throws IgniteCheckedException { - return removeAsync(key, (CacheEntryPredicate)null).get(); + @Override protected boolean remove0(K key, CacheEntryPredicate filter) throws IgniteCheckedException { + return (Boolean)remove0(key, false, filter, false).get(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<Boolean> removeAsync(K key, @Nullable CacheEntryPredicate filter) { - A.notNull(key, "key"); - - return removeAsync0(key, false, filter); - } - - /** {@inheritDoc} */ - @Override public boolean remove(K key, V val) throws IgniteCheckedException { - return removeAsync(key, val).get(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) { - A.notNull(key, "key", val, "val"); - - return removeAsync(key, ctx.equalsVal(val)); + @Override public IgniteInternalFuture<Boolean> removeAsync0(K key, @Nullable CacheEntryPredicate filter) { + return remove0(key, false, filter, true); } /** {@inheritDoc} */ @@ -796,7 +821,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) { ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); - return removeAllAsync0(null, conflictMap, false, false); + return removeAllAsync0(null, conflictMap, false, false, true); } /** @@ -811,10 +836,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Future. */ @SuppressWarnings("unchecked") - protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) { - if (!asyncToggled) - return op.apply(); - + private <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) { IgniteInternalFuture<T> fail = asyncOpAcquire(); if (fail != null) @@ -871,7 +893,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { - IgniteInternalFuture<EntryProcessorResult<T>> invokeFut = invokeAsync(key, entryProcessor, args); + IgniteInternalFuture<EntryProcessorResult<T>> invokeFut = invoke0(false, key, entryProcessor, args); EntryProcessorResult<T> res = invokeFut.get(); @@ -881,16 +903,30 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, - Object... args) - throws IgniteCheckedException { - return invokeAllAsync(keys, entryProcessor, args).get(); + Object... args) throws IgniteCheckedException + { + return invokeAll0(false, keys, entryProcessor, args).get(); } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") @Override public <T> IgniteInternalFuture<EntryProcessorResult<T>> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) { + return invoke0(true, key, entryProcessor, args); + } + + /** + * @param async Async operation flag. + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Entry processor arguments. + * @return Future. + */ + private <T> IgniteInternalFuture<EntryProcessorResult<T>> invoke0( + boolean async, + K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { A.notNull(key, "key", entryProcessor, "entryProcessor"); if (keyCheck) @@ -900,14 +936,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = updateAsync0( + IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = update0( key, null, entryProcessor, args, false, null, - true); + true, + async); return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) @@ -940,6 +977,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor, Object... args) { + return invokeAll0(true, keys, entryProcessor, args); + } + + /** + * @param async Async operation flag. + * @param keys Keys. + * @param entryProcessor Entry processor. + * @param args Entry processor arguments. + * @return Future. + */ + private <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAll0( + boolean async, + Set<? extends K> keys, + final EntryProcessor<K, V, T> entryProcessor, + Object... args) { A.notNull(keys, "keys", entryProcessor, "entryProcessor"); if (keyCheck) @@ -955,7 +1007,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> resFut = updateAllAsync0(null, + IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> resFut = updateAll0(null, invokeMap, args, null, @@ -963,7 +1015,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, true, - TRANSFORM); + TRANSFORM, + async); return resFut.chain( new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() { @@ -981,7 +1034,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) throws IgniteCheckedException { - return invokeAllAsync(map, args).get(); + A.notNull(map, "map"); + + if (keyCheck) + validateCacheKeys(map.keySet()); + + return (Map<K, EntryProcessorResult<T>>)updateAll0(null, + map, + args, + null, + null, + false, + false, + true, + TRANSFORM, + false).get(); } /** {@inheritDoc} */ @@ -994,7 +1061,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (keyCheck) validateCacheKeys(map.keySet()); - return updateAllAsync0(null, + return updateAll0(null, map, args, null, @@ -1002,7 +1069,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, true, - TRANSFORM); + TRANSFORM, + true); } /** @@ -1016,10 +1084,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. * @param waitTopFut Whether to wait for topology future. + * @param async Async operation flag. * @return Completion future. */ @SuppressWarnings("ConstantConditions") - private IgniteInternalFuture updateAllAsync0( + private IgniteInternalFuture updateAll0( @Nullable Map<? extends K, ? extends V> map, @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap, @Nullable Object[] invokeArgs, @@ -1028,7 +1097,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean retval, final boolean rawRetval, final boolean waitTopFut, - final GridCacheOperation op + final GridCacheOperation op, + boolean async ) { assert ctx.updatesAllowed(); @@ -1105,13 +1175,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, waitTopFut); - return asyncOp(new CO<IgniteInternalFuture<Object>>() { - @Override public IgniteInternalFuture<Object> apply() { - updateFut.map(); + if (async) { + return asyncOp(new CO<IgniteInternalFuture<Object>>() { + @Override public IgniteInternalFuture<Object> apply() { + updateFut.map(); - return updateFut; - } - }); + return updateFut; + } + }); + } + else { + updateFut.map(); + + return updateFut; + } } /** @@ -1124,16 +1201,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param retval Return value flag. * @param filter Filter. * @param waitTopFut Whether to wait for topology future. + * @param async Async operation flag. * @return Future. */ - private IgniteInternalFuture updateAsync0( + private IgniteInternalFuture update0( K key, @Nullable V val, @Nullable EntryProcessor proc, @Nullable Object[] invokeArgs, final boolean retval, @Nullable final CacheEntryPredicate filter, - final boolean waitTopFut + final boolean waitTopFut, + boolean async ) { assert val == null || proc == null; @@ -1146,13 +1225,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridNearAtomicAbstractUpdateFuture updateFut = createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut); - return asyncOp(new CO<IgniteInternalFuture<Object>>() { - @Override public IgniteInternalFuture<Object> apply() { - updateFut.map(); + if (async) { + return asyncOp(new CO<IgniteInternalFuture<Object>>() { + @Override public IgniteInternalFuture<Object> apply() { + updateFut.map(); - return updateFut; - } - }); + return updateFut; + } + }); + } + else { + updateFut.map(); + + return updateFut; + } } /** @@ -1161,33 +1247,38 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param key Key. * @param retval Whether to return * @param filter Filter. + * @param async Async operation flag. * @return Future. */ - private IgniteInternalFuture removeAsync0(K key, final boolean retval, - @Nullable CacheEntryPredicate filter) { - final boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - final long start = statsEnabled ? System.nanoTime() : 0L; - + private IgniteInternalFuture remove0(K key, final boolean retval, + @Nullable CacheEntryPredicate filter, + boolean async) { assert ctx.updatesAllowed(); - validateCacheKey(key); - ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); - final GridNearAtomicAbstractUpdateFuture updateFut = - createSingleUpdateFuture(key, null, null, null, retval, filter, true); + final GridNearAtomicAbstractUpdateFuture updateFut = createSingleUpdateFuture(key, + null, + null, + null, + retval, + filter, + true); - if (statsEnabled) - updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); + if (async) { + return asyncOp(new CO<IgniteInternalFuture<Object>>() { + @Override public IgniteInternalFuture<Object> apply() { + updateFut.map(); - return asyncOp(new CO<IgniteInternalFuture<Object>>() { - @Override public IgniteInternalFuture<Object> apply() { - updateFut.map(); + return updateFut; + } + }); + } + else { + updateFut.map(); - return updateFut; - } - }); + return updateFut; + } } /** @@ -1326,14 +1417,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable Collection<? extends K> keys, @Nullable Map<KeyCacheObject, GridCacheVersion> conflictMap, final boolean retval, - boolean rawRetval + boolean rawRetval, + boolean async ) { assert ctx.updatesAllowed(); - final boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - final long start = statsEnabled ? System.nanoTime() : 0L; - assert keys != null || conflictMap != null; if (keyCheck) @@ -1380,16 +1468,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, true); - if (statsEnabled) - updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); + if (async) { + return asyncOp(new CO<IgniteInternalFuture<Object>>() { + @Override public IgniteInternalFuture<Object> apply() { + updateFut.map(); - return asyncOp(new CO<IgniteInternalFuture<Object>>() { - @Override public IgniteInternalFuture<Object> apply() { - updateFut.map(); + return updateFut; + } + }); + } + else { + updateFut.map(); - return updateFut; - } - }); + return updateFut; + } } /** @@ -3248,7 +3340,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param ver Version to ack. */ private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) { - deferredUpdateMessageSender.sendDeferredAckMessage(nodeId, ver); + deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, ver); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 176a90f..9cf400d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -327,19 +327,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte needVer); } - /** {@inheritDoc} */ - @Override protected GridCacheEntryEx entryExSafe( - KeyCacheObject key, - AffinityTopologyVersion topVer - ) { - try { - return ctx.affinity().localNode(key, topVer) ? entryEx(key) : null; - } - catch (GridDhtInvalidPartitionException ignored) { - return null; - } - } - /** * @param keys Keys to load. * @param readThrough Read through flag.
