Repository: ignite Updated Branches: refs/heads/ignite-3478-initialVal [created] 3a2fbcc16
More optimal GridCacheMapEntry.initialValue. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a2fbcc1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a2fbcc1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a2fbcc1 Branch: refs/heads/ignite-3478-initialVal Commit: 3a2fbcc1642bc8fe567b626779651dde0f66572a Parents: 031f63c Author: sboikov <[email protected]> Authored: Fri Oct 27 16:07:22 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 27 16:07:22 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 119 +++++++++++++++---- .../cache/IgniteCacheOffheapManager.java | 5 - .../cache/IgniteCacheOffheapManagerImpl.java | 12 -- .../distributed/dht/GridDhtLockFuture.java | 26 +++- .../distributed/dht/GridDhtTxPrepareFuture.java | 23 +++- .../dht/preloader/GridDhtForceKeysFuture.java | 20 +++- .../datastreamer/DataStreamerImpl.java | 20 ++-- 7 files changed, 162 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3a2fbcc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index e46e4d2..789a302 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2541,35 +2541,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { checkObsolete(); - boolean update; - boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled(); - if (cctx.shared().database().persistenceEnabled()) { - unswap(false); + long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; - if (!isNew()) { - if (cctx.atomic()) - update = ATOMIC_VER_COMPARATOR.compare(this.ver, ver) < 0; - else - update = this.ver.compareTo(ver) < 0; - } - else - update = true; - } - else - update = isNew() && !cctx.offheap().containsKey(this); + InitialValueClosure c = new InitialValueClosure(this, val, ver, expTime); - update |= !preload && deletedUnlocked(); + cctx.offheap().invoke(cctx, key, localPartition(), c); - if (update) { - long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; + boolean update = c.update || (!preload && deletedUnlocked()); + if (update) { val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); - if (val != null) - storeValue(val, expTime, ver, null); - update(val, expTime, ttl, ver, true); boolean skipQryNtf = false; @@ -3915,6 +3899,99 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * */ + private static class InitialValueClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { + /** */ + private final GridCacheMapEntry entry; + + /** */ + private final CacheObject val; + + /** */ + private final GridCacheVersion ver; + + /** */ + private final long expireTime; + + /** */ + private IgniteTree.OperationType op; + + /** */ + private CacheDataRow oldRow; + + /** */ + private CacheDataRow newRow; + + /** */ + private boolean update; + + /** + * @param entry Entry. + * @param val Value. + * @param ver Version. + * @param expireTime Expire time. + */ + InitialValueClosure(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, long expireTime) { + this.entry = entry; + this.val = val; + this.ver = ver; + this.expireTime = expireTime; + } + + /** {@inheritDoc} */ + @Nullable @Override public CacheDataRow oldRow() { + return oldRow; + } + + /** {@inheritDoc} */ + @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException { + this.oldRow = oldRow; + + if (oldRow != null) { + GridCacheContext cctx = entry.context(); + + if (cctx.shared().database().persistenceEnabled()) { + if (cctx.atomic()) + update = ATOMIC_VER_COMPARATOR.compare(oldRow.version(), ver) < 0; + else + update = oldRow.version().compareTo(ver) < 0; + } + else + update = false; + } + else + update = true; + + if (update && val != null) { + op = IgniteTree.OperationType.PUT; + + newRow = entry.localPartition().dataStore().createRow( + entry.cctx, + entry.key, + val, + ver, + expireTime, + oldRow); + } + else + op = IgniteTree.OperationType.NOOP; + } + + /** {@inheritDoc} */ + @Override public CacheDataRow newRow() { + return newRow; + } + + /** {@inheritDoc} */ + @Override public IgniteTree.OperationType operationType() { + assert op != null; + + return op; + } + } + + /** + * + */ private static class AtomicCacheUpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { /** */ private final GridCacheMapEntry entry; http://git-wip-us.apache.org/repos/asf/ignite/blob/3a2fbcc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 4531802..51d5ae7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -133,11 +133,6 @@ public interface IgniteCacheOffheapManager { public void destroyCacheDataStore(CacheDataStore store) throws IgniteCheckedException; /** - * TODO: GG-10884, used on only from initialValue. - */ - public boolean containsKey(GridCacheMapEntry entry); - - /** * @param cctx Cache context. * @param c Closure. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/3a2fbcc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 7944c50..c5fc704 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -403,18 +403,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public boolean containsKey(GridCacheMapEntry entry) { - try { - return read(entry) != null; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to read value", e); - - return false; - } - } - - /** {@inheritDoc} */ @Override public void onPartitionCounterUpdated(int part, long cntr) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/3a2fbcc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index e0a0260..7422403 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -1256,17 +1256,31 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo cctx.shared().database().checkpointReadLock(); try { - if (entry.initialValue(info.value(), + if (entry.initialValue( + info.value(), info.version(), info.ttl(), info.expireTime(), - true, topVer, + true, + topVer, replicate ? DR_PRELOAD : DR_NONE, false)) { - if (rec && !entry.isInternal()) - cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), - (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null, - false, null, null, null, false); + if (rec && !entry.isInternal()) { + cctx.events().addEvent(entry.partition(), + entry.key(), + cctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_REBALANCE_OBJECT_LOADED, + info.value(), + true, + null, + false, + null, + null, + null, + false); + } } } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/3a2fbcc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 6873890..63f742c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1820,7 +1820,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE; try { - if (entry.initialValue(info.value(), + if (entry.initialValue( + info.value(), info.version(), info.ttl(), info.expireTime(), @@ -1828,10 +1829,22 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite topVer, drType, false)) { - if (rec && !entry.isInternal()) - cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), - (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null, - false, null, null, null, false); + if (rec && !entry.isInternal()) { + cacheCtx.events().addEvent(entry.partition(), + entry.key(), + cctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_REBALANCE_OBJECT_LOADED, + info.value(), + true, + null, + false, + null, + null, + null, + false); + } if (retVal && !invoke) ret.value(cacheCtx, info.value(), false); http://git-wip-us.apache.org/repos/asf/ignite/blob/3a2fbcc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index fe216a0..ff039bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -544,10 +544,22 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec replicate ? DR_PRELOAD : DR_NONE, false )) { - if (rec && !entry.isInternal()) - cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), - (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null, - false, null, null, null, false); + if (rec && !entry.isInternal()) { + cctx.events().addEvent(entry.partition(), + entry.key(), + cctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_REBALANCE_OBJECT_LOADED, + info.value(), + true, + null, + false, + null, + null, + null, + false); + } } } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3a2fbcc1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index d38132f..54c61b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -2080,16 +2080,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed expiryTime = CU.toExpireTime(ttl); } - boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer); - - entry.initialValue(e.getValue(), - ver, - ttl, - expiryTime, - false, - topVer, - primary ? GridDrType.DR_LOAD : GridDrType.DR_PRELOAD, - false); + boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer); + + entry.initialValue(e.getValue(), + ver, + ttl, + expiryTime, + false, + topVer, + primary ? GridDrType.DR_LOAD : GridDrType.DR_PRELOAD, + false); cctx.evicts().touch(entry, topVer);
