Repository: ignite Updated Branches: refs/heads/ignite-1093-2 a8b323d74 -> 5de124cd9
ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621eb0f7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621eb0f7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621eb0f7 Branch: refs/heads/ignite-1093-2 Commit: 621eb0f75bbe1a0a623229dded38a3549309eead Parents: 8b94494 Author: sboikov <[email protected]> Authored: Mon Sep 21 21:37:52 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Sep 21 21:37:52 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 37 +++++++++++++------- .../processors/cache/GridCacheProcessor.java | 2 +- .../processors/cache/GridCacheSwapManager.java | 24 ++++++------- .../datastreamer/DataStreamerImpl.java | 2 -- 4 files changed, 37 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f2bb646..961c792 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1588,6 +1588,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean hasValPtr = hasOffHeapPointer(); + if (old == null) + old = saveValueForIndexUnlocked(); + // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. clearIndex(old); @@ -2163,6 +2166,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Must persist inside synchronization in non-tx mode. cctx.store().remove(null, keyValue(false)); + if (oldVal == null) + oldVal = saveValueForIndexUnlocked(); + // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. clearIndex(oldVal); @@ -3342,7 +3348,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme try { synchronized (this) { - CacheObject expiredVal = saveValueForIndexUnlocked(); + CacheObject expiredVal = saveOldValueUnlocked(false); boolean hasOldBytes = hasOffHeapPointer(); @@ -3523,12 +3529,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme try { GridCacheQueryManager qryMgr = cctx.queries(); - if (qryMgr != null && qryMgr.enabled()) { - qryMgr.store(key, - val, - ver, - expireTime); - } + if (qryMgr.enabled()) + qryMgr.store(key, val, ver, expireTime); } catch (IgniteCheckedException e) { throw new GridCacheIndexUpdateException(e); @@ -3547,8 +3549,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme try { GridCacheQueryManager<?, ?> qryMgr = cctx.queries(); - if (qryMgr != null) - qryMgr.remove(key(), prevVal == null ? null : prevVal); + if (qryMgr.enabled()) + qryMgr.remove(key(), prevVal); } catch (IgniteCheckedException e) { throw new GridCacheIndexUpdateException(e); @@ -3562,10 +3564,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @return Previous value or {@code null}. * @throws IgniteCheckedException If failed to retrieve previous value. */ - protected CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException { + protected final CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException { + return saveOldValueUnlocked(true); + } + + /** + * @param qryOnly If {@code true} reads old value only if query indexing is enabled. + * @return Previous value or {@code null}. + * @throws IgniteCheckedException If failed to retrieve previous value. + */ + private CacheObject saveOldValueUnlocked(boolean qryOnly) throws IgniteCheckedException { assert Thread.holdsLock(this); - if (cctx.queries() == null) + if (qryOnly && !cctx.queries().enabled()) return null; CacheObject val = rawGetOrUnmarshalUnlocked(false); @@ -3681,7 +3692,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (obsoleteVersionExtras() != null) return true; - CacheObject prev = saveValueForIndexUnlocked(); + CacheObject prev = saveOldValueUnlocked(false); if (!hasReaders() && markObsolete0(obsoleteVer, false)) { if (swap) { @@ -3791,7 +3802,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheQueryManager qryMgr = cctx.queries(); - if (qryMgr != null) + if (qryMgr.enabled()) qryMgr.onUnswap(key, prevVal); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c92de7d..7c16136 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2759,7 +2759,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (spaceName.equals(CU.swapSpaceName(cctx))) { GridCacheQueryManager qryMgr = cctx.queries(); - if (qryMgr != null) { + if (qryMgr.enabled()) { try { KeyCacheObject key = cctx.toCacheKeyObject(keyBytes); http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 9b6381e..d9a8b5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -696,12 +696,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { final GridCacheQueryManager qryMgr = cctx.queries(); - if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr)) + if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr)) return null; // Not found. swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() { @Override public void apply(byte[] rmv) { - if (qryMgr == null && cctx.config().isStatisticsEnabled()) + if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled()) cctx.cache().metrics0().onSwapRead(rmv != null); if (rmv != null) { @@ -843,7 +843,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridCacheSwapEntry entry; - if (qryMgr != null) { + if (qryMgr.enabled()) { entry = readOffheapBeforeRemove(key, keyBytes, part); if (entry != null) { @@ -952,7 +952,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { ClassLoader ldr = cctx.deploy().globalLoader(); - if (qryMgr != null) { // Unswap for indexing. + if (qryMgr.enabled()) { // Unswap for indexing. Iterator<SwapKey> iter = unprocessedKeys.iterator(); while (iter.hasNext()) { @@ -967,7 +967,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { unprocessedKeys, new IgniteBiInClosure<SwapKey, byte[]>() { @Override public void apply(SwapKey swapKey, byte[] rmv) { - if (qryMgr == null && cctx.config().isStatisticsEnabled()) + if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled()) cctx.cache().metrics0().onSwapRead(rmv != null); if (rmv != null) { @@ -1124,7 +1124,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { */ public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part) throws IgniteCheckedException { - assert cctx.queries() != null; + assert cctx.queries().enabled(); byte[] entryBytes = offheap.get(spaceName, part, key, keyBytes); @@ -1155,7 +1155,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { */ private boolean readSwapBeforeRemove(@Nullable KeyCacheObject key, SwapKey swapKey, ClassLoader ldr) throws IgniteCheckedException { - assert cctx.queries() != null; + assert cctx.queries().enabled(); byte[] entryBytes = swapMgr.read(spaceName, swapKey, ldr); @@ -1196,7 +1196,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (offheapEnabled) { byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext()); - if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) && + if ((!qryMgr.enabled() || readOffheapBeforeRemove(key, keyBytes, part) != null) && offheap.removex(spaceName, part, key, keyBytes)) { if (cctx.config().isStatisticsEnabled()) cctx.cache().metrics0().onOffHeapRemove(); @@ -1212,7 +1212,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { ClassLoader ldr = cctx.deploy().globalLoader(); - if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr)) + if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr)) return; // Not found. swapMgr.remove(spaceName, @@ -1279,7 +1279,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridCacheQueryManager qryMgr = cctx.queries(); - if (qryMgr != null) + if (qryMgr.enabled()) qryMgr.onSwap(key); } @@ -1308,7 +1308,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(), (IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null); - if (qryMgr != null) + if (qryMgr.enabled()) qryMgr.onSwap(swapEntry.key()); } } @@ -1330,7 +1330,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { cctx.events().addEvent(batchSwapEntry.partition(), batchSwapEntry.key(), cctx.nodeId(), (IgniteUuid)null, null, EVT_CACHE_OBJECT_SWAPPED, null, false, null, true, null, null, null); - if (qryMgr != null) + if (qryMgr.enabled()) qryMgr.onSwap(batchSwapEntry.key()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index b5d9a7d..ab2a6e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1569,8 +1569,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer); - entry.unswap(false); - if (plc != null) { ttl = CU.toTtl(plc.getExpiryForCreation());
