ignite-4652 Implemented BPlusTree.invoke
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee28b9cb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee28b9cb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee28b9cb Branch: refs/heads/ignite-3477 Commit: ee28b9cb89400af6fcddd89a52fcd1adbcd5d4ff Parents: 40f015d Author: sboikov <[email protected]> Authored: Wed Feb 22 09:55:50 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Feb 22 09:55:50 2017 +0300 ---------------------------------------------------------------------- .../benchmarks/jmh/tree/BPlusTreeBenchmark.java | 3 +- .../internal/pagemem/wal/record/DataRecord.java | 10 +- .../processors/cache/GridCacheMapEntry.java | 1729 +++++++++++------- .../cache/GridCacheUpdateAtomicResult.java | 96 +- .../cache/IgniteCacheOffheapManager.java | 48 + .../cache/IgniteCacheOffheapManagerImpl.java | 236 ++- .../processors/cache/database/CacheDataRow.java | 6 + .../cache/database/CacheDataRowAdapter.java | 13 + .../cache/database/MetadataStorage.java | 2 +- .../cache/database/tree/BPlusTree.java | 936 ++++++++-- .../distributed/dht/GridDhtCacheEntry.java | 5 + .../apache/ignite/internal/util/IgniteTree.java | 47 + .../processors/database/BPlusTreeSelfTest.java | 272 ++- .../database/FreeListImplSelfTest.java | 5 + .../processors/query/h2/database/H2Tree.java | 2 +- .../processors/query/h2/opt/GridH2Row.java | 5 + .../query/h2/opt/GridH2TreeIndex.java | 5 + 17 files changed, 2462 insertions(+), 958 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java index 7355850..dc74363 100644 --- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java @@ -190,7 +190,8 @@ public class BPlusTreeBenchmark extends JmhAbstractBenchmark { } /** {@inheritDoc} */ - @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx) throws IgniteCheckedException { + @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx, Object ignore) + throws IgniteCheckedException { assert io.canGetRow() : io; return io.getLookupRow(this, pageAddr, idx); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java index 6592852..d2747f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java @@ -17,15 +17,10 @@ package org.apache.ignite.internal.pagemem.wal.record; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; - -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; /** * @@ -68,6 +63,7 @@ public class DataRecord extends WALRecord { return writeEntries == null ? Collections.<DataEntry>emptyList() : writeEntries; } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DataRecord.class, this, super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 6dc1d04..d28ea25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -40,7 +40,9 @@ import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; +import org.apache.ignite.internal.processors.cache.database.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; @@ -60,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple; @@ -83,6 +86,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; /** @@ -1535,11 +1540,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public GridCacheUpdateAtomicResult innerUpdate( - GridCacheVersion newVer, + final GridCacheVersion newVer, final UUID evtNodeId, final UUID affNodeId, - GridCacheOperation op, - @Nullable Object writeObj, + final GridCacheOperation op, + @Nullable final Object writeObj, @Nullable final Object[] invokeArgs, final boolean writeThrough, final boolean readThrough, @@ -1555,42 +1560,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme final GridDrType drType, final long explicitTtl, final long explicitExpireTime, - @Nullable GridCacheVersion conflictVer, + @Nullable final GridCacheVersion conflictVer, final boolean conflictResolve, final boolean intercept, @Nullable final UUID subjId, final String taskName, @Nullable final CacheObject prevVal, @Nullable final Long updateCntr, - @Nullable GridDhtAtomicAbstractUpdateFuture fut + @Nullable final GridDhtAtomicAbstractUpdateFuture fut ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { - assert cctx.atomic(); - - boolean res = true; - - CacheObject oldVal; - CacheObject updated; - - GridCacheVersion enqueueVer = null; + assert cctx.atomic() && !detached(); - GridCacheVersionConflictContext<?, ?> conflictCtx = null; - - IgniteBiTuple<Object, Exception> invokeRes = null; - - // System TTL/ET which may have special values. - long newSysTtl; - long newSysExpireTime; - - // TTL/ET which will be passed to entry on update. - long newTtl; - long newExpireTime; - - Object key0 = null; - Object updated0 = null; - - Long updateCntr0 = null; + AtomicCacheUpdateClosure c; synchronized (this) { + checkObsolete(); + boolean internal = isInternal() || !context().userCache(); Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false); @@ -1598,679 +1583,270 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); - checkObsolete(); - - CacheDataRow oldRow = null; - - // Load and remove from swap if it is new. - if (isStartVersion()) - oldRow = unswap(retval, false); - - // Prepare old value. - oldVal = val; - // Possibly read value from store. - boolean readFromStore = false; - - Object old0 = null; - - if (readThrough && needVal && oldVal == null && (cctx.readThrough() && - (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { - old0 = readThrough(null, key, false, subjId, taskName); - - oldVal = cctx.toCacheObject(old0); - - readFromStore = true; - - // Detach value before index update. - oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx); - - // Calculate initial TTL and expire time. - long initTtl; - long initExpireTime; - - if (expiryPlc != null && oldVal != null) { - IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc); - - initTtl = initTtlAndExpireTime.get1(); - initExpireTime = initTtlAndExpireTime.get2(); - } - else { - initTtl = CU.TTL_ETERNAL; - initExpireTime = CU.EXPIRE_TIME_ETERNAL; - } + boolean readFromStore = readThrough && needVal && (cctx.readThrough() && + (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue())); + + c = new AtomicCacheUpdateClosure(this, + newVer, + op, + writeObj, + invokeArgs, + readFromStore, + writeThrough, + keepBinary, + expiryPlc, + primary, + verCheck, + filter, + explicitTtl, + explicitExpireTime, + conflictVer, + conflictResolve, + intercept, + updateCntr); + + key.valueBytes(cctx.cacheObjectContext()); - if (oldVal != null) - storeValue(oldVal, initExpireTime, ver, oldRow); - // else nothing to do, real old value was null. - - update(oldVal, initExpireTime, initTtl, ver, true); + if (isNear()) { + CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null; - if (deletedUnlocked() && oldVal != null && !isInternal()) - deletedUnlocked(false); + c.call(dataRow); } + else + cctx.offheap().invoke(key, localPartition(), c); - Object transformClo = null; - - // Request-level conflict resolution is needed, i.e. we do not know who will win in advance. - if (conflictResolve) { - GridCacheVersion oldConflictVer = version().conflictVersion(); - - // Cache is conflict-enabled. - if (cctx.conflictNeedResolve()) { - GridCacheVersionedEntryEx newEntry; - - GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc, - explicitTtl, - explicitExpireTime); - - // Prepare old and new entries for conflict resolution. - GridCacheVersionedEntryEx oldEntry = versionedEntry(keepBinary); - - if (op == GridCacheOperation.TRANSFORM) { - transformClo = writeObj; + GridCacheUpdateAtomicResult updateRes = c.updateRes; - EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; + assert updateRes != null : c; - oldVal = this.val; + CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null; + CacheObject updateVal = null; + GridCacheVersion updateVer = c.newVer; - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(), - keepBinary, this); + // Apply metrics. + if (metrics && + updateRes.outcome().updateReadMetrics() && + cctx.cache().configuration().isStatisticsEnabled() && + needVal) { + // PutIfAbsent methods must not update hit/miss statistics. + if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter)) + cctx.cache().metrics0().onRead(oldVal != null); + } - try { - Object computed = entryProcessor.process(entry, invokeArgs); + switch (updateRes.outcome()) { + case VERSION_CHECK_FAILED: { + if (!cctx.isNear()) { + CacheObject evtVal; - if (entry.modified()) - writeObj = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())); - else - writeObj = oldVal; + if (op == GridCacheOperation.TRANSFORM) { + EntryProcessor<Object, Object, ?> entryProcessor = + (EntryProcessor<Object, Object, ?>)writeObj; - key0 = entry.key(); + CacheInvokeEntry<Object, Object> entry = + new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this); - if (computed != null) - invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null); - } - catch (Exception e) { - invokeRes = new IgniteBiTuple(null, e); + try { + entryProcessor.process(entry, invokeArgs); - writeObj = oldVal; + evtVal = entry.modified() ? + cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; + } + catch (Exception ignore) { + evtVal = prevVal; + } } - } - - newEntry = new GridCacheLazyPlainVersionedEntry<>( - cctx, - key, - (CacheObject)writeObj, - expiration.get1(), - expiration.get2(), - conflictVer != null ? conflictVer : newVer, - keepBinary); - - // Resolve conflict. - conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck); - - assert conflictCtx != null; - - boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY; - - // Use old value? - if (conflictCtx.isUseOld()) { - GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer; + else + evtVal = (CacheObject)writeObj; - // Handle special case with atomic comparator. - if (!isNew() && // Not initial value, - verCheck && // and atomic version check, - oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal, - ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal, - cctx.writeThrough() && // and store is enabled, - primary) // and we are primary. - { - CacheObject val = this.val; + long updateCntr0 = nextPartCounter(); - if (val == null) { - assert deletedUnlocked(); + if (updateCntr != null) + updateCntr0 = updateCntr; - cctx.store().remove(null, key); - } - else - cctx.store().put(null, key, val, ver); - } + onUpdateFinished(updateCntr0); - return new GridCacheUpdateAtomicResult(false, - retval ? this.val : null, - null, - invokeRes, - CU.TTL_ETERNAL, - CU.EXPIRE_TIME_ETERNAL, - null, - null, + cctx.continuousQueries().onEntryUpdated( + key, + evtVal, + prevVal, + isInternal() || !context().userCache(), + partition(), + primary, false, - updateCntr0 == null ? 0 : updateCntr0); + updateCntr0, + null, + topVer); } - // Will update something. - else { - // Merge is a local update which override passed value bytes. - if (conflictCtx.isMerge()) { - writeObj = cctx.toCacheObject(conflictCtx.mergeValue()); - - conflictVer = null; - } - else - assert conflictCtx.isUseNew(); - // Update value is known at this point, so update operation type. - op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; - } + return updateRes; } - else - // Nullify conflict version on this update, so that we will use regular version during next updates. - conflictVer = null; - } - - boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY; - - // Perform version check only in case there was no explicit conflict resolution. - if (conflictCtx == null) { - if (verCheck) { - if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) { - if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) { - if (log.isDebugEnabled()) - log.debug("Received entry update with same version as current (will update store) " + - "[entry=" + this + ", newVer=" + newVer + ']'); - CacheObject val = this.val; + case CONFLICT_USE_OLD: + case FILTER_FAILED: + case INVOKE_NO_OP: + case INTERCEPTOR_CANCEL: + return updateRes; + } - if (val == null) { - assert deletedUnlocked(); + assert updateRes.outcome() == UpdateOutcome.SUCCESS || updateRes.outcome() == UpdateOutcome.REMOVE_NO_VAL; - cctx.store().remove(null, key); - } - else - cctx.store().put(null, key, val, ver); - } - else { - if (log.isDebugEnabled()) - log.debug("Received entry update with smaller version than current (will ignore) " + - "[entry=" + this + ", newVer=" + newVer + ']'); - } + CacheObject evtOld = null; - if (!cctx.isNear()) { - CacheObject evtVal; + if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + assert writeObj instanceof EntryProcessor : writeObj; - if (op == GridCacheOperation.TRANSFORM) { - EntryProcessor<Object, Object, ?> entryProcessor = - (EntryProcessor<Object, Object, ?>)writeObj; + evtOld = cctx.unwrapTemporary(oldVal); - CacheInvokeEntry<Object, Object> entry = - new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this); + Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj); - try { - entryProcessor.process(entry, invokeArgs); + cctx.events().addEvent(partition(), + key, + evtNodeId, + null, + newVer, + EVT_CACHE_OBJECT_READ, + evtOld, evtOld != null, + evtOld, evtOld != null, + subjId, + transformClo.getClass().getName(), + taskName, + keepBinary); + } - evtVal = entry.modified() ? - cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; - } - catch (Exception ignore) { - evtVal = prevVal; - } - } - else - evtVal = (CacheObject)writeObj; - - updateCntr0 = nextPartCounter(topVer); - - if (updateCntr != null) - updateCntr0 = updateCntr; - - onUpdateFinished(updateCntr0); - - cctx.continuousQueries().onEntryUpdated( - key, - evtVal, - prevVal, - isInternal() || !context().userCache(), - partition(), - primary, - false, - updateCntr0, - null, - topVer); - } + if (c.op == GridCacheOperation.UPDATE) { + updateVal = val; - return new GridCacheUpdateAtomicResult(false, - retval ? this.val : null, - null, - invokeRes, - CU.TTL_ETERNAL, - CU.EXPIRE_TIME_ETERNAL, - null, - null, - false, - 0); - } - } - else - assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 : - "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']'; - } + assert updateVal != null : c; - // Apply metrics. - if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { - // PutIfAbsent methods mustn't update hit/miss statistics - if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter)) - cctx.cache().metrics0().onRead(oldVal != null); - } + drReplicate(drType, updateVal, updateVer, topVer); - // Check filter inside of synchronization. - if (!F.isEmptyOrNulls(filter)) { - boolean pass = cctx.isAllLocked(this, filter); + recordNodeId(affNodeId, topVer); - if (!pass) { - if (expiryPlc != null && !readFromStore && hasValueUnlocked() && !cctx.putIfAbsentFilter(filter)) - updateTtl(expiryPlc); + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(oldVal); - return new GridCacheUpdateAtomicResult(false, - retval ? oldVal : null, - null, - invokeRes, - CU.TTL_ETERNAL, - CU.EXPIRE_TIME_ETERNAL, + cctx.events().addEvent(partition(), + key, + evtNodeId, null, + newVer, + EVT_CACHE_OBJECT_PUT, + updateVal, + true, + evtOld, + evtOld != null, + subjId, null, - false, - updateCntr0 == null ? 0 : updateCntr0); + taskName, + keepBinary); } } + else { + assert c.op == GridCacheOperation.DELETE : c.op; - // Calculate new value in case we met transform. - if (op == GridCacheOperation.TRANSFORM) { - assert conflictCtx == null : "Cannot be TRANSFORM here if conflict resolution was performed earlier."; - - transformClo = writeObj; - - EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; - - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(), keepBinary, this); - - try { - Object computed = entryProcessor.process(entry, invokeArgs); - - if (entry.modified()) { - updated0 = cctx.unwrapTemporary(entry.getValue()); - updated = cctx.toCacheObject(updated0); - } - else - updated = oldVal; - - key0 = entry.key(); + clearReaders(); - if (computed != null) - invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null); - } - catch (Exception e) { - invokeRes = new IgniteBiTuple(null, e); + drReplicate(drType, null, newVer, topVer); - updated = oldVal; - } + recordNodeId(affNodeId, topVer); - if (!entry.modified()) { - if (expiryPlc != null && !readFromStore && hasValueUnlocked()) - updateTtl(expiryPlc); + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(oldVal); - return new GridCacheUpdateAtomicResult(false, - retval ? oldVal : null, - null, - invokeRes, - CU.TTL_ETERNAL, - CU.EXPIRE_TIME_ETERNAL, - null, + cctx.events().addEvent(partition(), + key, + evtNodeId, + null, newVer, + EVT_CACHE_OBJECT_REMOVED, + null, false, + evtOld, evtOld != null, + subjId, null, - false, - updateCntr0 == null ? 0 : updateCntr0); + taskName, + keepBinary); } } - else - updated = (CacheObject)writeObj; - - op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE; - - assert op == GridCacheOperation.UPDATE || (op == GridCacheOperation.DELETE && updated == null); - - boolean hadVal = hasValueUnlocked(); - - // Incorporate conflict version into new version if needed. - if (conflictVer != null && conflictVer != newVer) - newVer = new GridCacheVersionEx(newVer.topologyVersion(), - newVer.globalTime(), - newVer.order(), - newVer.nodeOrder(), - newVer.dataCenterId(), - conflictVer); - if (op == GridCacheOperation.UPDATE) { - // Conflict context is null if there were no explicit conflict resolution. - if (conflictCtx == null) { - // Calculate TTL and expire time for local update. - if (explicitTtl != CU.TTL_NOT_CHANGED) { - // If conflict existed, expire time must be explicit. - assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE; - - newSysTtl = newTtl = explicitTtl; - newSysExpireTime = explicitExpireTime; - - newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ? - explicitExpireTime : CU.toExpireTime(explicitTtl); - } - else { - newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED : - hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate(); + if (updateRes.success()) + updateMetrics(c.op, metrics); - if (newSysTtl == CU.TTL_NOT_CHANGED) { - newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; - newTtl = ttlExtras(); - newExpireTime = expireTimeExtras(); - } - else if (newSysTtl == CU.TTL_ZERO) { - op = GridCacheOperation.DELETE; + // Continuous query filter should be perform under lock. + if (lsnrs != null) { + CacheObject evtVal = cctx.unwrapTemporary(updateVal); + CacheObject evtOldVal = cctx.unwrapTemporary(oldVal); - newSysTtl = CU.TTL_NOT_CHANGED; - newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; + cctx.continuousQueries().onEntryUpdated(lsnrs, + key, + evtVal, + evtOldVal, + internal, + partition(), + primary, + false, + c.updateRes.updateCounter(), + fut, + topVer); + } - newTtl = CU.TTL_ETERNAL; - newExpireTime = CU.EXPIRE_TIME_ETERNAL; + cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary); - updated = null; - } - else { - newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; - newTtl = newSysTtl; - newExpireTime = CU.toExpireTime(newTtl); - } - } + if (intercept) { + if (c.op == GridCacheOperation.UPDATE) { + cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( + cctx, + key, + null, + updateVal, + null, + keepBinary, + c.updateRes.updateCounter())); } else { - newSysTtl = newTtl = conflictCtx.ttl(); - newSysExpireTime = newExpireTime = conflictCtx.expireTime(); + assert c.op == GridCacheOperation.DELETE : c.op; + + cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry( + cctx, + key, + null, + oldVal, + null, + keepBinary, + c.updateRes.updateCounter())); } } - else { - assert op == GridCacheOperation.DELETE; - - newSysTtl = CU.TTL_NOT_CHANGED; - newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; + } - newTtl = CU.TTL_ETERNAL; - newExpireTime = CU.EXPIRE_TIME_ETERNAL; - } + onUpdateFinished(c.updateRes.updateCounter()); - // TTL and expire time must be resolved at this point. - assert newTtl != CU.TTL_NOT_CHANGED && newTtl != CU.TTL_ZERO && newTtl >= 0; - assert newExpireTime != CU.EXPIRE_TIME_CALCULATE && newExpireTime >= 0; + return c.updateRes; + } - IgniteBiTuple<Boolean, Object> interceptRes = null; + /** + * @param val Value. + * @param cacheObj Cache object. + * @param keepBinary Keep binary flag. + * @param cpy Copy flag. + * @return Cache object value. + */ + @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) { + if (val != null) + return val; - // Actual update. - if (op == GridCacheOperation.UPDATE) { - if (log.isTraceEnabled()) { - log.trace("innerUpdate [key=" + key + - ", entry=" + System.identityHashCode(this) + ']'); - } + return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy); + } - if (intercept) { - updated0 = value(updated0, updated, keepBinary, false); + /** + * @param expiry Expiration policy. + * @return Tuple holding initial TTL and expire time with the given expiry. + */ + private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) { + assert expiry != null; - Object interceptorVal = cctx.config().getInterceptor() - .onBeforePut(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary), updated0); - - if (interceptorVal == null) - return new GridCacheUpdateAtomicResult(false, - retval ? oldVal : null, - null, - invokeRes, - CU.TTL_ETERNAL, - CU.EXPIRE_TIME_ETERNAL, - null, - null, - false, - updateCntr0 == null ? 0 : updateCntr0); - else if (interceptorVal != updated0) { - updated0 = cctx.unwrapTemporary(interceptorVal); - - updated = cctx.toCacheObject(updated0); - } - } - - // Try write-through. - if (writeThrough) - // Must persist inside synchronization in non-tx mode. - cctx.store().put(null, key, updated, newVer); - - if (!hadVal) { - boolean new0 = isNew(); - - assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this + ", locNodeId=" + - cctx.localNodeId() + ']'; - - if (!new0 && !isInternal()) - deletedUnlocked(false); - } - else { - assert !deletedUnlocked() : "Invalid entry [entry=" + this + - ", locNodeId=" + cctx.localNodeId() + ']'; - - // Do not change size. - } - - updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx); - - updateCntr0 = nextPartCounter(topVer); - - if (updateCntr != null) - updateCntr0 = updateCntr; - - logUpdate(op, updated, newVer, newExpireTime, updateCntr0); - - storeValue(updated, newExpireTime, newVer, oldRow); - - update(updated, newExpireTime, newTtl, newVer, true); - - drReplicate(drType, updated, newVer, topVer); - - recordNodeId(affNodeId, topVer); - - if (evt) { - CacheObject evtOld = null; - - if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - evtOld = cctx.unwrapTemporary(oldVal); - - transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo); - - cctx.events().addEvent(partition(), key, evtNodeId, null, - newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, - evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName, - keepBinary); - } - - if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { - if (evtOld == null) - evtOld = cctx.unwrapTemporary(oldVal); - - cctx.events().addEvent(partition(), key, evtNodeId, null, - newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld, - evtOld != null || hadVal, subjId, null, taskName, keepBinary); - } - } - } - else { - if (intercept) { - interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(cctx, key, key0, - oldVal, old0, keepBinary, updateCntr0)); - - if (cctx.cancelRemove(interceptRes)) - return new GridCacheUpdateAtomicResult(false, - cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())), - null, - invokeRes, - CU.TTL_ETERNAL, - CU.EXPIRE_TIME_ETERNAL, - null, - null, - false, - updateCntr0 == null ? 0 : updateCntr0); - } - - if (writeThrough) - // Must persist inside synchronization in non-tx mode. - cctx.store().remove(null, key); - - updateCntr0 = nextPartCounter(topVer); - - if (updateCntr != null) - updateCntr0 = updateCntr; - - logUpdate(op, null, newVer, 0, updateCntr0); - - removeValue(); - - if (hadVal) { - assert !deletedUnlocked(); - - if (!isInternal()) - deletedUnlocked(true); - } - else { - boolean new0 = isNew(); - - assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + this + ", locNodeId=" + - cctx.localNodeId() + ']'; - - if (new0) { - if (!isInternal()) - deletedUnlocked(true); - } - } - - enqueueVer = newVer; - - // Clear value on backup. Entry will be removed from cache when it got evicted from queue. - update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true); - - assert newSysTtl == CU.TTL_NOT_CHANGED; - assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE; - - clearReaders(); - - recordNodeId(affNodeId, topVer); - - drReplicate(drType, null, newVer, topVer); - - if (evt) { - CacheObject evtOld = null; - - if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - evtOld = cctx.unwrapTemporary(oldVal); - - transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo); - - cctx.events().addEvent(partition(), key, evtNodeId, null, - newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, - evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName, - keepBinary); - } - - if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { - if (evtOld == null) - evtOld = cctx.unwrapTemporary(oldVal); - - cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, - EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal, - subjId, null, taskName, keepBinary); - } - } - - res = hadVal; - } - - if (res) - updateMetrics(op, metrics); - - // Continuous query filter should be perform under lock. - if (lsnrs != null) { - CacheObject evtVal = cctx.unwrapTemporary(updated); - CacheObject evtOldVal = cctx.unwrapTemporary(oldVal); - - cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal, - partition(), primary, false, updateCntr0, fut, topVer); - } - - cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); - - if (intercept) { - if (op == GridCacheOperation.UPDATE) - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( - cctx, - key, - key0, - updated, - updated0, - keepBinary, - updateCntr0)); - else - cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry( - cctx, - key, - key0, - oldVal, - old0, - keepBinary, - updateCntr0)); - - if (interceptRes != null) - oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); - } - } - - onUpdateFinished(updateCntr0); - - if (log.isDebugEnabled()) - log.debug("Updated cache entry [val=" + val + ", old=" + oldVal + ", entry=" + this + ']'); - - return new GridCacheUpdateAtomicResult(res, - oldVal, - updated, - invokeRes, - newSysTtl, - newSysExpireTime, - enqueueVer, - conflictCtx, - true, - updateCntr0 == null ? 0 : updateCntr0); - } - - /** - * @param val Value. - * @param cacheObj Cache object. - * @param keepBinary Keep binary flag. - * @param cpy Copy flag. - * @return Cache object value. - */ - @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) { - if (val != null) - return val; - - return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy); - } - - /** - * @param expiry Expiration policy. - * @return Tuple holding initial TTL and expire time with the given expiry. - */ - private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) { - assert expiry != null; - - long initTtl = expiry.forCreate(); - long initExpireTime; + long initTtl = expiry.forCreate(); + long initExpireTime; if (initTtl == CU.TTL_ZERO) { initTtl = CU.TTL_MINIMUM; @@ -2294,8 +1870,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param expireTime Explicit expire time. * @return Result. */ - private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) - throws GridCacheEntryRemovedException { + private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) { + assert !obsolete(); + boolean rmv = false; // 1. If TTL is not changed, then calculate it based on expiry. @@ -2313,7 +1890,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // 3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL". if (ttl == CU.TTL_NOT_CHANGED) { - if (isNew()) + if (isStartVersion()) ttl = CU.TTL_ETERNAL; else { ttl = ttlExtras(); @@ -3027,6 +2604,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** + * @return Update counter. + */ + protected long nextPartCounter() { + return 0; + } + + /** * @param topVer Topology version. * @return Update counter. */ @@ -3566,13 +3150,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert Thread.holdsLock(this); assert val != null : "null values in update for key: " + key; - cctx.offheap().update(key, - val, - ver, - expireTime, - partition(), - localPartition(), - oldRow); + cctx.offheap().invoke(key, localPartition(), new UpdateClosure(this, val, ver, expireTime)); } /** @@ -4177,6 +3755,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @param key Key. + * @param keepBinary Keep binary flag. */ private LazyValueEntry(KeyCacheObject key, boolean keepBinary) { this.key = key; @@ -4223,4 +3802,854 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return "IteratorEntry [key=" + key + ']'; } } + + /** + * + */ + private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { + /** */ + private final GridCacheMapEntry entry; + + /** */ + private final CacheObject val; + + /** */ + private final GridCacheVersion ver; + + /** */ + private final long expireTime; + + /** */ + private CacheDataRow newRow; + + /** */ + private CacheDataRow oldRow; + + /** */ + private IgniteTree.OperationType treeOp = IgniteTree.OperationType.PUT; + + /** + * @param entry Entry. + * @param val New value. + * @param ver New version. + * @param expireTime New expire time. + */ + UpdateClosure(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, long expireTime) { + this.entry = entry; + this.val = val; + this.ver = ver; + this.expireTime = expireTime; + } + + /** {@inheritDoc} */ + @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException { + this.oldRow = oldRow; + + if (oldRow != null) + oldRow.key(entry.key); + + newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(entry.key, + val, + ver, + expireTime, + oldRow); + + treeOp = oldRow != null && oldRow.link() == newRow.link() ? + IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT; + } + + /** {@inheritDoc} */ + @Override public CacheDataRow newRow() { + return newRow; + } + + /** {@inheritDoc} */ + @Override public IgniteTree.OperationType operationType() { + return treeOp; + } + + /** {@inheritDoc} */ + @Nullable @Override public CacheDataRow oldRow() { + return oldRow; + } + } + + /** + * + */ + private static class AtomicCacheUpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { + /** */ + private final GridCacheMapEntry entry; + + /** */ + private GridCacheVersion newVer; + + /** */ + private GridCacheOperation op; + + /** */ + private Object writeObj; + + /** */ + private Object[] invokeArgs; + + /** */ + private final boolean readThrough; + + /** */ + private final boolean writeThrough; + + /** */ + private final boolean keepBinary; + + /** */ + private final IgniteCacheExpiryPolicy expiryPlc; + + /** */ + private final boolean primary; + + /** */ + private final boolean verCheck; + + /** */ + private final CacheEntryPredicate[] filter; + + /** */ + private final long explicitTtl; + + /** */ + private final long explicitExpireTime; + + /** */ + private GridCacheVersion conflictVer; + + /** */ + private final boolean conflictResolve; + + /** */ + private final boolean intercept; + + /** */ + private final Long updateCntr; + + /** */ + private GridCacheUpdateAtomicResult updateRes; + + /** */ + private IgniteTree.OperationType treeOp; + + /** */ + private CacheDataRow newRow; + + /** */ + private CacheDataRow oldRow; + + AtomicCacheUpdateClosure(GridCacheMapEntry entry, + GridCacheVersion newVer, + GridCacheOperation op, + Object writeObj, + Object[] invokeArgs, + boolean readThrough, + boolean writeThrough, + boolean keepBinary, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean primary, + boolean verCheck, + @Nullable CacheEntryPredicate[] filter, + long explicitTtl, + long explicitExpireTime, + @Nullable GridCacheVersion conflictVer, + boolean conflictResolve, + boolean intercept, + @Nullable Long updateCntr) { + assert op == UPDATE || op == DELETE || op == TRANSFORM : op; + + this.entry = entry; + this.newVer = newVer; + this.op = op; + this.writeObj = writeObj; + this.invokeArgs = invokeArgs; + this.readThrough = readThrough; + this.writeThrough = writeThrough; + this.keepBinary = keepBinary; + this.expiryPlc = expiryPlc; + this.primary = primary; + this.verCheck = verCheck; + this.filter = filter; + this.explicitTtl = explicitTtl; + this.explicitExpireTime = explicitExpireTime; + this.conflictVer = conflictVer; + this.conflictResolve = conflictResolve; + this.intercept = intercept; + this.updateCntr = updateCntr; + + switch (op) { + case UPDATE: + treeOp = IgniteTree.OperationType.PUT; + + break; + + case DELETE: + treeOp = IgniteTree.OperationType.REMOVE; + + break; + } + } + + /** {@inheritDoc} */ + @Nullable @Override public CacheDataRow oldRow() { + return oldRow; + } + + /** {@inheritDoc} */ + @Override public CacheDataRow newRow() { + return newRow; + } + + /** {@inheritDoc} */ + @Override public IgniteTree.OperationType operationType() { + return treeOp; + } + + /** {@inheritDoc} */ + @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException { + assert entry.isNear() || oldRow == null || oldRow.link() != 0 : oldRow; + + if (oldRow != null) + oldRow.key(entry.key()); + + this.oldRow = oldRow; + + GridCacheContext cctx = entry.context(); + + CacheObject oldVal; + CacheObject storeLoadedVal = null; + + if (oldRow != null) { + oldVal = oldRow.value(); + + entry.update(oldVal, oldRow.expireTime(), 0, oldRow.version(), false); + } + else + oldVal = null; + + if (oldVal == null && readThrough) { + storeLoadedVal = cctx.toCacheObject(cctx.store().load(null, entry.key)); + + if (storeLoadedVal != null) { + oldVal = cctx.kernalContext().cacheObjects().prepareForCache(storeLoadedVal, cctx); + + entry.val = oldVal; + + if (entry.deletedUnlocked()) + entry.deletedUnlocked(false); + } + } + + CacheInvokeEntry<Object, Object> invokeEntry = null; + IgniteBiTuple<Object, Exception> invokeRes = null; + + boolean invoke = op == TRANSFORM; + + if (invoke) { + invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, entry.ver, keepBinary, entry); + + invokeRes = runEntryProcessor(invokeEntry); + + op = writeObj == null ? DELETE : UPDATE; + } + + CacheObject newVal = (CacheObject)writeObj; + + GridCacheVersionConflictContext<?, ?> conflictCtx = null; + + if (conflictResolve) { + conflictCtx = resolveConflict(newVal, invokeRes); + + if (updateRes != null) { + assert conflictCtx != null && conflictCtx.isUseOld() : conflictCtx; + assert treeOp == IgniteTree.OperationType.NOOP : treeOp; + + return; + } + } + + if (conflictCtx == null) { + // Perform version check only in case there was no explicit conflict resolution. + versionCheck(invokeRes); + + if (updateRes != null) { + assert treeOp == IgniteTree.OperationType.NOOP : treeOp; + + return; + } + } + + if (!F.isEmptyOrNulls(filter)) { + boolean pass = cctx.isAllLocked(entry, filter); + + if (!pass) { + initResultOnCancelUpdate(storeLoadedVal, !cctx.putIfAbsentFilter(filter)); + + updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.FILTER_FAILED, + oldVal, + null, + invokeRes, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, + null, + null, + 0); + + return; + } + } + + if (invoke) { + if (!invokeEntry.modified()) { + initResultOnCancelUpdate(storeLoadedVal, true); + + updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INVOKE_NO_OP, + oldVal, + null, + invokeRes, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, + null, + null, + 0); + + return; + } + + op = writeObj == null ? DELETE : UPDATE; + } + + // Incorporate conflict version into new version if needed. + if (conflictVer != null && conflictVer != newVer) { + newVer = new GridCacheVersionEx(newVer.topologyVersion(), + newVer.globalTime(), + newVer.order(), + newVer.nodeOrder(), + newVer.dataCenterId(), + conflictVer); + } + + if (op == UPDATE) { + assert writeObj != null; + + update(conflictCtx, invokeRes, storeLoadedVal != null); + } + else { + assert op == DELETE && writeObj == null : op; + + remove(conflictCtx, invokeRes, storeLoadedVal != null); + } + + assert updateRes != null && treeOp != null; + } + + /** + * @param storeLoadedVal Value loaded from store. + * @param updateExpireTime {@code True} if need update expire time. + * @throws IgniteCheckedException If failed. + */ + private void initResultOnCancelUpdate(@Nullable CacheObject storeLoadedVal, boolean updateExpireTime) + throws IgniteCheckedException { + boolean needUpdate = false; + + if (storeLoadedVal != null) { + long initTtl; + long initExpireTime; + + if (expiryPlc != null) { + IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc); + + initTtl = initTtlAndExpireTime.get1(); + initExpireTime = initTtlAndExpireTime.get2(); + } + else { + initTtl = CU.TTL_ETERNAL; + initExpireTime = CU.EXPIRE_TIME_ETERNAL; + } + + entry.update(storeLoadedVal, initExpireTime, initTtl, entry.ver, true); + + needUpdate = true; + } + else if (updateExpireTime && expiryPlc != null && entry.val != null){ + long ttl = expiryPlc.forAccess(); + + if (ttl != CU.TTL_NOT_CHANGED) { + long expireTime; + + if (ttl == CU.TTL_ZERO) { + ttl = CU.TTL_MINIMUM; + expireTime = CU.expireTimeInPast(); + } + else + expireTime = CU.toExpireTime(ttl); + + if (entry.expireTimeExtras() != expireTime) { + entry.update(entry.val, expireTime, ttl, entry.ver, true); + + expiryPlc.ttlUpdated(entry.key, entry.ver, null); + + needUpdate = true; + } + } + } + + if (needUpdate) { + newRow = entry.localPartition().dataStore().createRow(entry.key, + storeLoadedVal, + newVer, + entry.expireTimeExtras(), + oldRow); + + treeOp = IgniteTree.OperationType.PUT; + } + else + treeOp = IgniteTree.OperationType.NOOP; + } + + /** + * @param conflictCtx Conflict context. + * @param invokeRes Entry processor result (for invoke operation). + * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store. + * @throws IgniteCheckedException If failed. + */ + private void update(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx, + @Nullable IgniteBiTuple<Object, Exception> invokeRes, + boolean readFromStore) + throws IgniteCheckedException + { + GridCacheContext cctx = entry.context(); + + final CacheObject oldVal = entry.val; + CacheObject updated = (CacheObject)writeObj; + + long newSysTtl; + long newSysExpireTime; + + long newTtl; + long newExpireTime; + + // Conflict context is null if there were no explicit conflict resolution. + if (conflictCtx == null) { + // Calculate TTL and expire time for local update. + if (explicitTtl != CU.TTL_NOT_CHANGED) { + // If conflict existed, expire time must be explicit. + assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE; + + newSysTtl = newTtl = explicitTtl; + newSysExpireTime = explicitExpireTime; + + newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ? + explicitExpireTime : CU.toExpireTime(explicitTtl); + } + else { + newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED : + entry.val != null ? expiryPlc.forUpdate() : expiryPlc.forCreate(); + + if (newSysTtl == CU.TTL_NOT_CHANGED) { + newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; + newTtl = entry.ttlExtras(); + newExpireTime = entry.expireTimeExtras(); + } + else if (newSysTtl == CU.TTL_ZERO) { + op = GridCacheOperation.DELETE; + + writeObj = null; + + remove(conflictCtx, invokeRes, readFromStore); + + return; + } + else { + newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; + newTtl = newSysTtl; + newExpireTime = CU.toExpireTime(newTtl); + } + } + } + else { + newSysTtl = newTtl = conflictCtx.ttl(); + newSysExpireTime = newExpireTime = conflictCtx.expireTime(); + } + + if (intercept) { + Object updated0 = cctx.unwrapBinaryIfNeeded(updated, keepBinary, false); + + CacheLazyEntry<Object, Object> interceptEntry = new CacheLazyEntry<>(cctx, + entry.key, + null, + oldVal, + null, + keepBinary); + + Object interceptorVal = cctx.config().getInterceptor().onBeforePut(interceptEntry, updated0); + + if (interceptorVal == null) { + treeOp = IgniteTree.OperationType.NOOP; + + updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL, + oldVal, + null, + invokeRes, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, + null, + null, + 0); + + return; + } + else if (interceptorVal != updated0) { + updated0 = cctx.unwrapTemporary(interceptorVal); + + updated = cctx.toCacheObject(updated0); + } + } + + updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx); + + if (writeThrough) + // Must persist inside synchronization in non-tx mode. + cctx.store().put(null, entry.key, updated, newVer); + + if (entry.val == null) { + boolean new0 = entry.isStartVersion(); + + assert entry.deletedUnlocked() || new0 || entry.isInternal(): "Invalid entry [entry=" + entry + + ", locNodeId=" + cctx.localNodeId() + ']'; + + if (!new0 && !entry.isInternal()) + entry.deletedUnlocked(false); + } + else { + assert !entry.deletedUnlocked() : "Invalid entry [entry=" + this + + ", locNodeId=" + cctx.localNodeId() + ']'; + } + + long updateCntr0 = entry.nextPartCounter(); + + if (updateCntr != null) + updateCntr0 = updateCntr; + + entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0); + + if (!entry.isNear()) { + newRow = entry.localPartition().dataStore().createRow(entry.key, + updated, + newVer, + newExpireTime, + oldRow); + + treeOp = oldRow != null && oldRow.link() == newRow.link() ? + IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT; + } + else + treeOp = IgniteTree.OperationType.PUT; + + entry.update(updated, newExpireTime, newTtl, newVer, true); + + updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.SUCCESS, + oldVal, + updated, + invokeRes, + newSysTtl, + newSysExpireTime, + null, + conflictCtx, + updateCntr0); + } + + /** + * @param conflictCtx Conflict context. + * @param invokeRes Entry processor result (for invoke operation). + * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx, + @Nullable IgniteBiTuple<Object, Exception> invokeRes, + boolean readFromStore) + throws IgniteCheckedException + { + GridCacheContext cctx = entry.context(); + + CacheObject oldVal = entry.val; + + IgniteBiTuple<Boolean, Object> interceptRes = null; + + if (intercept) { + CacheLazyEntry<Object, Object> intercepEntry = new CacheLazyEntry<>(cctx, + entry.key, + null, + oldVal, + null, + keepBinary); + + interceptRes = cctx.config().getInterceptor().onBeforeRemove(intercepEntry); + + if (cctx.cancelRemove(interceptRes)) { + treeOp = IgniteTree.OperationType.NOOP; + + updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL, + cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())), + null, + invokeRes, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, + null, + null, + 0); + + return; + } + } + + if (writeThrough) + // Must persist inside synchronization in non-tx mode. + cctx.store().remove(null, entry.key); + + long updateCntr0 = entry.nextPartCounter(); + + if (updateCntr != null) + updateCntr0 = updateCntr; + + if (oldVal != null) { + assert !entry.deletedUnlocked(); + + if (!entry.isInternal()) + entry.deletedUnlocked(true); + } + else { + boolean new0 = entry.isStartVersion(); + + assert entry.deletedUnlocked() || new0 || entry.isInternal() : "Invalid entry [entry=" + this + + ", locNodeId=" + cctx.localNodeId() + ']'; + + if (new0) { + if (!entry.isInternal()) + entry.deletedUnlocked(true); + } + } + + GridCacheVersion enqueueVer = newVer; + + entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true); + + treeOp = (oldVal == null || readFromStore) ? IgniteTree.OperationType.NOOP : + IgniteTree.OperationType.REMOVE; + + UpdateOutcome outcome = oldVal != null ? UpdateOutcome.SUCCESS : UpdateOutcome.REMOVE_NO_VAL; + + if (interceptRes != null) + oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); + + updateRes = new GridCacheUpdateAtomicResult(outcome, + oldVal, + null, + invokeRes, + CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE, + enqueueVer, + conflictCtx, + updateCntr0); + } + + /** + * @param newVal New entry value. + * @param invokeRes Entry processor result (for invoke operation). + * @return Conflict context. + * @throws IgniteCheckedException If failed. + */ + private GridCacheVersionConflictContext<?, ?> resolveConflict( + CacheObject newVal, + @Nullable IgniteBiTuple<Object, Exception> invokeRes) + throws IgniteCheckedException + { + GridCacheContext cctx = entry.context(); + + // Cache is conflict-enabled. + if (cctx.conflictNeedResolve()) { + GridCacheVersion oldConflictVer = entry.ver.conflictVersion(); + + // Prepare old and new entries for conflict resolution. + GridCacheVersionedEntryEx oldEntry = new GridCacheLazyPlainVersionedEntry<>(cctx, + entry.key, + entry.val, + entry.ttlExtras(), + entry.expireTimeExtras(), + entry.ver.conflictVersion(), + entry.isStartVersion(), + keepBinary); + + GridTuple3<Long, Long, Boolean> expiration = entry.ttlAndExpireTime(expiryPlc, + explicitTtl, + explicitExpireTime); + + GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry<>( + cctx, + entry.key, + newVal, + expiration.get1(), + expiration.get2(), + conflictVer != null ? conflictVer : newVer, + keepBinary); + + // Resolve conflict. + GridCacheVersionConflictContext<?, ?> conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck); + + assert conflictCtx != null; + + boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY; + + // Use old value? + if (conflictCtx.isUseOld()) { + GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer; + + // Handle special case with atomic comparator. + if (!entry.isStartVersion() && // Not initial value, + verCheck && // and atomic version check, + oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal, + ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal, + cctx.writeThrough() && // and store is enabled, + primary) // and we are primary. + { + CacheObject val = entry.val; + + if (val == null) { + assert entry.deletedUnlocked(); + + cctx.store().remove(null, entry.key); + } + else + cctx.store().put(null, entry.key, val, entry.ver); + } + + treeOp = IgniteTree.OperationType.NOOP; + + updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.CONFLICT_USE_OLD, + entry.val, + null, + invokeRes, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, + null, + null, + 0); + } + // Will update something. + else { + // Merge is a local update which override passed value bytes. + if (conflictCtx.isMerge()) { + writeObj = cctx.toCacheObject(conflictCtx.mergeValue()); + + conflictVer = null; + } + else + assert conflictCtx.isUseNew(); + + // Update value is known at this point, so update operation type. + op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; + } + + return conflictCtx; + } + else + // Nullify conflict version on this update, so that we will use regular version during next updates. + conflictVer = null; + + return null; + } + + /** + * @param invokeRes Entry processor result (for invoke operation). + * @throws IgniteCheckedException If failed. + */ + private void versionCheck(@Nullable IgniteBiTuple<Object, Exception> invokeRes) throws IgniteCheckedException { + GridCacheContext cctx = entry.context(); + + boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY; + + if (verCheck) { + if (!entry.isStartVersion() && ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) >= 0) { + if (ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) { + if (log.isDebugEnabled()) + log.debug("Received entry update with same version as current (will update store) " + + "[entry=" + this + ", newVer=" + newVer + ']'); + + CacheObject val = entry.val; + + if (val == null) { + assert entry.deletedUnlocked(); + + cctx.store().remove(null, entry.key); + } + else + cctx.store().put(null, entry.key, val, entry.ver); + } + else { + if (log.isDebugEnabled()) + log.debug("Received entry update with smaller version than current (will ignore) " + + "[entry=" + this + ", newVer=" + newVer + ']'); + } + + treeOp = IgniteTree.OperationType.NOOP; + + updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.VERSION_CHECK_FAILED, + entry.val, + null, + invokeRes, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, + null, + null, + 0); + } + } + else + assert entry.isStartVersion() || ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) <= 0 : + "Invalid version for inner update [isNew=" + entry.isStartVersion() + ", entry=" + this + ", newVer=" + newVer + ']'; + } + + /** + * @param invokeEntry Entry for {@link EntryProcessor}. + * @return Entry processor return value. + */ + @SuppressWarnings("unchecked") + private IgniteBiTuple<Object, Exception> runEntryProcessor(CacheInvokeEntry<Object, Object> invokeEntry) { + EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; + + try { + Object computed = entryProcessor.process(invokeEntry, invokeArgs); + + if (invokeEntry.modified()) { + GridCacheContext cctx = entry.context(); + + writeObj = cctx.toCacheObject(cctx.unwrapTemporary(invokeEntry.getValue())); + } + else + writeObj = invokeEntry.valObj; + + if (computed != null) + return new IgniteBiTuple<>(entry.cctx.unwrapTemporary(computed), null); + + return null; + } + catch (Exception e) { + writeObj = invokeEntry.valObj; + + return new IgniteBiTuple<>(null, e); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AtomicCacheUpdateClosure.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 2355b7c..97cb534 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -29,8 +29,8 @@ import org.jetbrains.annotations.Nullable; * Cache entry atomic update result. */ public class GridCacheUpdateAtomicResult { - /** Success flag.*/ - private final boolean success; + /** Update operation outcome. */ + private final UpdateOutcome outcome; /** Old value. */ @GridToStringInclude @@ -54,9 +54,6 @@ public class GridCacheUpdateAtomicResult { @GridToStringInclude private final GridCacheVersionConflictContext<?, ?> conflictRes; - /** Whether update should be propagated to DHT node. */ - private final boolean sndToDht; - /** */ private final long updateCntr; @@ -66,7 +63,7 @@ public class GridCacheUpdateAtomicResult { /** * Constructor. * - * @param success Success flag. + * @param outcome Update outcome. * @param oldVal Old value. * @param newVal New value. * @param res Value computed by the {@link EntryProcessor}. @@ -74,10 +71,9 @@ public class GridCacheUpdateAtomicResult { * @param conflictExpireTime Explicit DR expire time (if any). * @param rmvVer Version for deferred delete. * @param conflictRes DR resolution result. - * @param sndToDht Whether update should be propagated to DHT node. * @param updateCntr Partition update counter. */ - public GridCacheUpdateAtomicResult(boolean success, + GridCacheUpdateAtomicResult(UpdateOutcome outcome, @Nullable CacheObject oldVal, @Nullable CacheObject newVal, @Nullable IgniteBiTuple<Object, Exception> res, @@ -85,9 +81,10 @@ public class GridCacheUpdateAtomicResult { long conflictExpireTime, @Nullable GridCacheVersion rmvVer, @Nullable GridCacheVersionConflictContext<?, ?> conflictRes, - boolean sndToDht, long updateCntr) { - this.success = success; + assert outcome != null; + + this.outcome = outcome; this.oldVal = oldVal; this.newVal = newVal; this.res = res; @@ -95,11 +92,17 @@ public class GridCacheUpdateAtomicResult { this.conflictExpireTime = conflictExpireTime; this.rmvVer = rmvVer; this.conflictRes = conflictRes; - this.sndToDht = sndToDht; this.updateCntr = updateCntr; } /** + * @return Update operation outcome. + */ + UpdateOutcome outcome() { + return outcome; + } + + /** * @return Value computed by the {@link EntryProcessor}. */ @Nullable public IgniteBiTuple<Object, Exception> computedResult() { @@ -110,7 +113,7 @@ public class GridCacheUpdateAtomicResult { * @return Success flag. */ public boolean success() { - return success; + return outcome.success(); } /** @@ -167,7 +170,74 @@ public class GridCacheUpdateAtomicResult { * @return Whether update should be propagated to DHT node. */ public boolean sendToDht() { - return sndToDht; + return outcome.sendToDht(); + } + + /** + * + */ + public enum UpdateOutcome { + /** */ + CONFLICT_USE_OLD(false, false, false), + + /** */ + VERSION_CHECK_FAILED(false, false, false), + + /** */ + FILTER_FAILED(false, false, true), + + /** */ + INVOKE_NO_OP(false, false, true), + + /** */ + INTERCEPTOR_CANCEL(false, false, true), + + /** */ + REMOVE_NO_VAL(false, true, true), + + /** */ + SUCCESS(true, true, true); + + /** */ + private final boolean success; + + /** */ + private final boolean sndToDht; + + /** */ + private final boolean updateReadMetrics; + + /** + * @param success Success flag. + * @param sndToDht Whether update should be propagated to DHT node. + * @param updateReadMetrics Metrics update flag. + */ + UpdateOutcome(boolean success, boolean sndToDht, boolean updateReadMetrics) { + this.success = success; + this.sndToDht = sndToDht; + this.updateReadMetrics = updateReadMetrics; + } + + /** + * @return Success flag. + */ + public boolean success() { + return success; + } + + /** + * @return Whether update should be propagated to DHT node. + */ + public boolean sendToDht() { + return sndToDht; + } + + /** + * @return Metrics update flag. + */ + public boolean updateReadMetrics() { + return updateReadMetrics; + } } /** {@inheritDoc} */
