Repository: ignite Updated Branches: refs/heads/ignite-4652 f979979a1 -> c4976dd26
ignite-4652 Atomic update refactoring to use BPlusTree.invoke. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c4976dd2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c4976dd2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c4976dd2 Branch: refs/heads/ignite-4652 Commit: c4976dd2603461f359485c2a8189ce839ab0b1c1 Parents: f979979 Author: sboikov <[email protected]> Authored: Thu Feb 9 13:29:40 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Feb 9 14:46:26 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 281 ++++++++++++------- .../cache/IgniteCacheOffheapManagerImpl.java | 5 +- 2 files changed, 176 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c4976dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index d1e07c6..5b7f4a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1714,108 +1714,106 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme keepBinary); } - if (updateRes.success()) { - if (c.op == GridCacheOperation.UPDATE) { - assert (isNear() && val != null) || c.newRow != null : c; - - updateVal = isNear() ? val : c.newRow.value(); + if (c.op == GridCacheOperation.UPDATE) { + updateVal = val; - assert updateVal != null : c; + assert updateVal != null : c; - drReplicate(drType, updateVal, updateVer, topVer); + drReplicate(drType, updateVal, updateVer, topVer); - recordNodeId(affNodeId, topVer); + recordNodeId(affNodeId, topVer); - if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { - if (evtOld == null) - evtOld = cctx.unwrapTemporary(oldVal); + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(oldVal); - cctx.events().addEvent(partition(), - key, - evtNodeId, - null, - newVer, - EVT_CACHE_OBJECT_PUT, - updateVal, - true, - evtOld, - evtOld != null, - subjId, - null, - taskName, - keepBinary); - } + cctx.events().addEvent(partition(), + key, + evtNodeId, + null, + newVer, + EVT_CACHE_OBJECT_PUT, + updateVal, + true, + evtOld, + evtOld != null, + subjId, + null, + taskName, + keepBinary); } - else { - assert c.op == GridCacheOperation.DELETE : c.op; + } + else { + assert c.op == GridCacheOperation.DELETE : c.op; - clearReaders(); + clearReaders(); - drReplicate(drType, null, newVer, topVer); + drReplicate(drType, null, newVer, topVer); - recordNodeId(affNodeId, topVer); + recordNodeId(affNodeId, topVer); - if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { - if (evtOld == null) - evtOld = cctx.unwrapTemporary(oldVal); + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(oldVal); - cctx.events().addEvent(partition(), - key, - evtNodeId, - null, newVer, - EVT_CACHE_OBJECT_REMOVED, - null, false, - evtOld, evtOld != null, - subjId, - null, - taskName, - keepBinary); - } + cctx.events().addEvent(partition(), + key, + evtNodeId, + null, newVer, + EVT_CACHE_OBJECT_REMOVED, + null, false, + evtOld, evtOld != null, + subjId, + null, + taskName, + keepBinary); } + } - updateMetrics(c.op, metrics); + updateMetrics(c.op, metrics); - // Continuous query filter should be perform under lock. - if (lsnrs != null) { - CacheObject evtVal = cctx.unwrapTemporary(updateVal); - CacheObject evtOldVal = cctx.unwrapTemporary(oldVal); + // Continuous query filter should be perform under lock. + if (lsnrs != null) { + CacheObject evtVal = cctx.unwrapTemporary(updateVal); + CacheObject evtOldVal = cctx.unwrapTemporary(oldVal); - cctx.continuousQueries().onEntryUpdated(lsnrs, + cctx.continuousQueries().onEntryUpdated(lsnrs, + key, + evtVal, + evtOldVal, + internal, + partition(), + primary, + false, + c.updateRes.updateCounter(), + fut, + topVer); + } + + cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary); + + if (intercept) { + if (c.op == GridCacheOperation.UPDATE) { + cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( + cctx, key, - evtVal, - evtOldVal, - internal, - partition(), - primary, - false, - c.updateRes.updateCounter(), - fut, - topVer); + null, + updateVal, + null, + keepBinary, + c.updateRes.updateCounter())); } + else { + assert c.op == GridCacheOperation.DELETE : c.op; - cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary); - - if (intercept) { - if (op == GridCacheOperation.UPDATE) { - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( - cctx, - key, - null, - updateVal, - null, - keepBinary, - c.updateRes.updateCounter())); - } - else { - cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry( - cctx, - key, - null, - oldVal, - null, - keepBinary, - c.updateRes.updateCounter())); - } + cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry( + cctx, + key, + null, + oldVal, + null, + keepBinary, + c.updateRes.updateCounter())); } } } @@ -1872,6 +1870,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @return Result. */ private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) { + assert !obsolete(); + boolean rmv = false; // 1. If TTL is not changed, then calculate it based on expiry. @@ -3980,7 +3980,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheInvokeEntry<Object, Object> invokeEntry = null; IgniteBiTuple<Object, Exception> invokeRes = null; - if (op == TRANSFORM) { + boolean invoke = op == TRANSFORM; + + if (invoke) { invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, entry.ver, keepBinary, entry); invokeRes = runEntryProcessor(invokeEntry); @@ -3990,7 +3992,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject newVal = (CacheObject)writeObj; - GridCacheVersionConflictContext<?, ?> conflictCtx; + GridCacheVersionConflictContext<?, ?> conflictCtx = null; if (conflictResolve) { conflictCtx = resolveConflict(newVal, invokeRes); @@ -4002,9 +4004,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return; } } - else { - conflictCtx = null; + if (conflictCtx == null) { // Perform version check only in case there was no explicit conflict resolution. versionCheck(invokeRes); @@ -4019,10 +4020,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean pass = cctx.isAllLocked(entry, filter); if (!pass) { - // TODO -// if (expiryPlc != null && !readFromStore && entry.val != null && !cctx.putIfAbsentFilter(filter)) -// updateTtl(expiryPlc); - treeOp = IgniteTree.OperationType.NOOP; + initResultOnCancelUpdate(storeLoadedVal, !cctx.putIfAbsentFilter(filter)); updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.FILTER_FAILED, oldVal, @@ -4038,14 +4036,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - if (op == TRANSFORM) { - assert invokeEntry != null; - + if (invoke) { if (!invokeEntry.modified()) { - // TODO -// if (expiryPlc != null && !readFromStore && entry.val != null) -// updateTtl(expiryPlc); - treeOp = IgniteTree.OperationType.NOOP; + initResultOnCancelUpdate(storeLoadedVal, true); updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INVOKE_NO_OP, oldVal, @@ -4076,24 +4069,90 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (op == UPDATE) { assert writeObj != null; - update(conflictCtx, invokeRes); + update(conflictCtx, invokeRes, storeLoadedVal != null); } else { assert op == DELETE && writeObj == null : op; - remove(conflictCtx, invokeRes); + remove(conflictCtx, invokeRes, storeLoadedVal != null); } assert updateRes != null && treeOp != null; } /** + * @param storeLoadedVal Value loaded from store. + * @param updateExpireTime {@code True} if need update expire time. + * @throws IgniteCheckedException If failed. + */ + private void initResultOnCancelUpdate(@Nullable CacheObject storeLoadedVal, boolean updateExpireTime) + throws IgniteCheckedException { + boolean needUpdate = false; + + if (storeLoadedVal != null) { + long initTtl; + long initExpireTime; + + if (expiryPlc != null) { + IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc); + + initTtl = initTtlAndExpireTime.get1(); + initExpireTime = initTtlAndExpireTime.get2(); + } + else { + initTtl = CU.TTL_ETERNAL; + initExpireTime = CU.EXPIRE_TIME_ETERNAL; + } + + entry.update(storeLoadedVal, initExpireTime, initTtl, entry.ver, true); + + needUpdate = true; + } + else if (updateExpireTime && expiryPlc != null && entry.val != null){ + long ttl = expiryPlc.forAccess(); + + if (ttl != CU.TTL_NOT_CHANGED) { + long expireTime; + + if (ttl == CU.TTL_ZERO) { + ttl = CU.TTL_MINIMUM; + expireTime = CU.expireTimeInPast(); + } + else + expireTime = CU.toExpireTime(ttl); + + if (entry.expireTimeExtras() != expireTime) { + entry.update(entry.val, expireTime, ttl, entry.ver, true); + + expiryPlc.ttlUpdated(entry.key, entry.ver, null); + + needUpdate = true; + } + } + } + + if (needUpdate) { + newRow = entry.localPartition().dataStore().createRow(entry.key, + storeLoadedVal, + newVer, + entry.expireTimeExtras(), + oldRow); + + treeOp = IgniteTree.OperationType.PUT; + } + else + treeOp = IgniteTree.OperationType.NOOP; + } + + /** * @param conflictCtx Conflict context. * @param invokeRes Entry processor result (for invoke operation). + * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store. * @throws IgniteCheckedException If failed. */ private void update(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx, - @Nullable IgniteBiTuple<Object, Exception> invokeRes) + @Nullable IgniteBiTuple<Object, Exception> invokeRes, + boolean readFromStore) throws IgniteCheckedException { GridCacheContext cctx = entry.context(); @@ -4134,7 +4193,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme writeObj = null; - remove(conflictCtx, invokeRes); + remove(conflictCtx, invokeRes, readFromStore); return; } @@ -4151,12 +4210,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (intercept) { - Object updated0 = entry.value(null, updated, keepBinary, false); + Object updated0 = cctx.unwrapBinaryIfNeeded(updated, keepBinary, false); CacheLazyEntry<Object, Object> interceptEntry = new CacheLazyEntry<>(cctx, entry.key, null, - entry.val, + oldVal, null, keepBinary); @@ -4174,6 +4233,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, 0); + + return; } else if (interceptorVal != updated0) { updated0 = cctx.unwrapTemporary(interceptorVal); @@ -4238,11 +4299,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @param conflictCtx Conflict context. * @param invokeRes Entry processor result (for invoke operation). + * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx, - @Nullable IgniteBiTuple<Object, Exception> invokeRes) + @Nullable IgniteBiTuple<Object, Exception> invokeRes, + boolean readFromStore) throws IgniteCheckedException { GridCacheContext cctx = entry.context(); @@ -4255,7 +4318,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheLazyEntry<Object, Object> intercepEntry = new CacheLazyEntry<>(cctx, entry.key, null, - oldVal, null, + oldVal, + null, keepBinary); interceptRes = cctx.config().getInterceptor().onBeforeRemove(intercepEntry); @@ -4308,7 +4372,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true); - treeOp = oldVal == null ? IgniteTree.OperationType.NOOP : IgniteTree.OperationType.REMOVE; + treeOp = (oldVal == null || readFromStore) ? IgniteTree.OperationType.NOOP : + IgniteTree.OperationType.REMOVE; UpdateOutcome outcome = oldVal != null ? UpdateOutcome.SUCCESS : UpdateOutcome.REMOVE_NO_VAL; http://git-wip-us.apache.org/repos/asf/ignite/blob/c4976dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 62a5cc3..eed9f09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1057,6 +1057,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @throws IgniteCheckedException If failed. */ private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + if (oldRow == null) + storageSize.incrementAndGet(); + KeyCacheObject key = newRow.key(); long expireTime = newRow.expireTime(); @@ -1093,8 +1096,6 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple if (newRow.link() != oldRow.link()) rowStore.removeRow(oldRow.link()); } - else - storageSize.incrementAndGet(); if (pendingEntries != null && expireTime != 0) pendingEntries.putx(new PendingRow(expireTime, newRow.link()));
