Repository: ignite Updated Branches: refs/heads/ignite-627 [created] 7aa249870
ignite-627 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7aa24987 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7aa24987 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7aa24987 Branch: refs/heads/ignite-627 Commit: 7aa249870ce7963cda4ad25fb2566b46a4fcf596 Parents: 797e7af Author: sboikov <sboi...@apache.org> Authored: Fri Oct 26 13:21:07 2018 +0300 Committer: sboikov <sboi...@apache.org> Committed: Tue Oct 30 08:54:21 2018 +0300 ---------------------------------------------------------------------- .../GridNearAtomicAbstractUpdateFuture.java | 293 ++++++++++++++++++- .../GridNearAtomicSingleUpdateFuture.java | 34 ++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 69 +++-- .../distributed/near/GridNearAtomicCache.java | 198 +------------ .../distributed/near/GridNearCacheEntry.java | 4 +- ...idCacheValueConsistencyAbstractSelfTest.java | 6 - .../atomic/IgniteCacheAtomicProtocolTest.java | 44 +++ 7 files changed, 410 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 983b18a..83d0bb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -36,16 +36,25 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -55,7 +64,10 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; +import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; /** * Base for near atomic update futures. @@ -142,6 +154,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture /** Operation result. */ protected GridCacheReturn opRes; + /** */ + protected Map<KeyCacheObject, GridNearCacheEntry> reservedEntries; + /** * Constructor. * @@ -242,29 +257,40 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture * Performs future mapping. */ public final void map() { + map(false); + } + + /** + * Performs future mapping. + * + * @param remap Remap flag. + */ + protected final void map(boolean remap) { AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); if (topVer == null) - mapOnTopology(); + mapOnTopology(remap); else { topLocked = true; // Cannot remap. remapCnt = 1; - map(topVer); + map(topVer, remap); } } /** * @param topVer Topology version. + * @param remap Remap flag. */ - protected abstract void map(AffinityTopologyVersion topVer); + protected abstract void map(AffinityTopologyVersion topVer, boolean remap); /** * Maps future on ready topology. + * @param remap Remap flag. */ - protected abstract void mapOnTopology(); + protected abstract void mapOnTopology(boolean remap); /** {@inheritDoc} */ @Override public IgniteUuid futureId() { @@ -357,7 +383,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture if (futId != null) cctx.mvcc().removeAtomicFuture(futId); - super.onDone(retval, err); + if (super.onDone(retval, err) && nearEnabled) + releaseNearCacheEntries(); } /** {@inheritDoc} */ @@ -380,6 +407,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture if (futId != null) cctx.mvcc().removeAtomicFuture(futId); + if (nearEnabled) + releaseNearCacheEntries(); + return true; } @@ -387,6 +417,25 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture } /** + * + */ + private void releaseNearCacheEntries() { + Map<KeyCacheObject, GridNearCacheEntry> reservedEntries0; + + synchronized (this) { + if (reservedEntries == null|| reservedEntries.isEmpty()) + return; + + reservedEntries0 = reservedEntries; + + reservedEntries = null; + } + + for (GridNearCacheEntry entry : reservedEntries0.values()) + entry.releaseEviction(); + } + + /** * @param req Request. * @param res Response. */ @@ -471,6 +520,240 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture } /** + * @return Near cache. + */ + protected final GridNearAtomicCache nearCache() { + return (GridNearAtomicCache)cctx.dht().near(); + } + + /** + * @param key Key, + * @param topVer Update topology version. + */ + protected final void reserveNearCacheEntry(KeyCacheObject key, AffinityTopologyVersion topVer) { + assert nearEnabled; + assert reservedEntries != null; + + if (cctx.affinityNode() && cctx.affinity().partitionBelongs(cctx.localNode(), cctx.affinity().partition(key), topVer)) + return; + + GridNearAtomicCache nearCache = nearCache(); + + synchronized (this) { + if (reservedEntries.containsKey(key)) + return; + + while (true) { + try { + GridNearCacheEntry entry = nearCache.entryExx(key, topVer); + + entry.reserveEviction(); + + reservedEntries.put(key, entry); + + return; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry while reserving near cache entry (will retry): " + key); + } + } + } + } + + /** + * @param req Update request. + * @param res Update response. + */ + protected final void processNearAtomicUpdateResponse( + GridNearAtomicAbstractUpdateRequest req, + GridNearAtomicUpdateResponse res + ) { + if (F.size(res.failedKeys()) == req.size()) + return; + + GridNearAtomicCache nearCache = nearCache(); + + /* + * Choose value to be stored in near cache: first check key is not in failed and not in skipped list, + * then check if value was generated on primary node, if not then use value sent in request. + */ + + Collection<KeyCacheObject> failed = res.failedKeys(); + List<Integer> nearValsIdxs = res.nearValuesIndexes(); + List<Integer> skipped = res.skippedIndexes(); + + GridCacheVersion ver = res.nearVersion(); + + assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']'; + + int nearValIdx = 0; + + String taskName = cctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + + for (int i = 0; i < req.size(); i++) { + if (F.contains(skipped, i)) + continue; + + KeyCacheObject key = req.key(i); + + if (F.contains(failed, key)) + continue; + + if (cctx.affinity().partitionBelongs(cctx.localNode(), cctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup. + GridCacheEntryEx entry = nearCache.peekEx(key); + + if (entry != null && entry.markObsolete(ver)) + nearCache.removeEntry(entry); + + continue; + } + + CacheObject val = null; + + if (F.contains(nearValsIdxs, i)) { + val = res.nearValue(nearValIdx); + + nearValIdx++; + } + else { + assert req.operation() != TRANSFORM; + + if (req.operation() != DELETE) + val = req.value(i); + } + + long ttl = res.nearTtl(i); + long expireTime = res.nearExpireTime(i); + + if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE) + expireTime = CU.toExpireTime(ttl); + + try { + processNearAtomicUpdateResponse( + nearCache, + topVer, + ver, + key, + val, + ttl, + expireTime, + req.keepBinary(), + req.nodeId(), + req.subjectId(), + taskName, + req.operation() == TRANSFORM); + } + catch (IgniteCheckedException e) { + res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e)); + } + } + } + + /** + * @param nearCache Near cache. + * @param topVer Update topology version. + * @param ver Version. + * @param key Key. + * @param val Value. + * @param ttl TTL. + * @param expireTime Expire time. + * @param keepBinary Keep binary flag. + * @param nodeId Node ID. + * @param subjId Subject ID. + * @param taskName Task name. + * @param transformedValue {@code True} if transformed value. + * @throws IgniteCheckedException If failed. + */ + private void processNearAtomicUpdateResponse( + GridNearAtomicCache nearCache, + AffinityTopologyVersion topVer, + GridCacheVersion ver, + KeyCacheObject key, + @Nullable CacheObject val, + long ttl, + long expireTime, + boolean keepBinary, + UUID nodeId, + UUID subjId, + String taskName, + boolean transformedValue) throws IgniteCheckedException { + try { + while (true) { + GridNearCacheEntry entry = null; + + try { + entry = nearCache.entryExx(key, topVer); + + GridCacheOperation op = val != null ? UPDATE : DELETE; + + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( + ver, + nodeId, + nodeId, + op, + val, + null, + /*write-through*/false, + /*read-through*/false, + /*retval*/false, + keepBinary, + /*expiry policy*/null, + /*event*/true, + /*metrics*/true, + /*primary*/false, + /*check version*/true, + topVer, + CU.empty0(), + DR_NONE, + ttl, + expireTime, + null, + false, + false, + subjId, + taskName, + null, + null, + null, + transformedValue); + + boolean release; + + synchronized (this) { + GridNearCacheEntry reserved = reservedEntries.remove(key); + + assert reserved == null || reserved == entry; + + release = reserved != null; + } + + if (release) + entry.releaseEviction(); + + if (updRes.removeVersion() != null) + nearCache.context().onDeferredDelete(entry, updRes.removeVersion()); + + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry while updating near cache value (will retry): " + key); + + entry = null; + } + finally { + if (entry != null) + entry.touch(topVer); + } + } + } + catch (GridDhtInvalidPartitionException ignored) { + // Ignore. + } + } + + /** * */ static class NodeResult { http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 4c0d2db..0d12dc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -17,12 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessor; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -39,7 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; +import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; @@ -48,6 +42,13 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -124,6 +125,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda this.key = key; this.val = val; + + reservedEntries = new GridLeanMap<>(1); } /** {@inheritDoc} */ @@ -375,7 +378,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - mapOnTopology(); + mapOnTopology(true); } }); } @@ -394,13 +397,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (res.remapTopologyVersion() != null) return; - GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); - - near.processNearAtomicUpdateResponse(req, res); + processNearAtomicUpdateResponse(req, res); } /** {@inheritDoc} */ - @Override protected void mapOnTopology() { + @Override protected void mapOnTopology(boolean remap) { AffinityTopologyVersion topVer; if (cache.topology().stopping()) { @@ -431,7 +432,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - mapOnTopology(); + mapOnTopology(remap); } }); } @@ -440,11 +441,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return; } - map(topVer); + map(topVer, remap); } /** {@inheritDoc} */ - @Override protected void map(AffinityTopologyVersion topVer) { + @Override protected void map(AffinityTopologyVersion topVer, boolean remap) { long futId = cctx.mvcc().nextAtomicId(); Exception err = null; @@ -479,6 +480,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return; } + if (!remap && nearEnabled) + reserveNearCacheEntry(reqState0.req.key(0), topVer); + // Optimize mapping for single key. sendSingleRequest(reqState0.req.nodeId(), reqState0.req); http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 28ebfb1..dc98854 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -17,15 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; @@ -43,7 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -56,6 +47,16 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; @@ -162,6 +163,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu this.vals = vals; this.conflictPutVals = conflictPutVals; this.conflictRmvVals = conflictRmvVals; + + reservedEntries = U.newHashMap(keys.size()); } /** {@inheritDoc} */ @@ -473,6 +476,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu completeFuture(opRes0, err0, res.futureId()); } + /** + * @param remapTopVer New topology version. + */ private void waitAndRemap(AffinityTopologyVersion remapTopVer) { assert remapTopVer != null; @@ -503,7 +509,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - mapOnTopology(); + mapOnTopology(true); } }); } @@ -617,13 +623,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (res.remapTopologyVersion() != null) return; - GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); - - near.processNearAtomicUpdateResponse(req, res); + processNearAtomicUpdateResponse(req, res); } /** {@inheritDoc} */ - @Override protected void mapOnTopology() { + @Override protected void mapOnTopology(boolean remap) { AffinityTopologyVersion topVer; if (cache.topology().stopping()) { @@ -652,7 +656,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - mapOnTopology(); + mapOnTopology(remap); } }); } @@ -661,7 +665,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; } - map(topVer, remapKeys); + map(topVer, remap, remapKeys); } /** @@ -725,15 +729,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @Override protected void map(AffinityTopologyVersion topVer) { - map(topVer, null); + @Override protected void map(AffinityTopologyVersion topVer, boolean remap) { + map(topVer, remap, null); } /** * @param topVer Topology version. + * @param remap Remap flag. * @param remapKeys Keys to remap. */ - private void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { + private void map(AffinityTopologyVersion topVer, boolean remap, @Nullable Collection<KeyCacheObject> remapKeys) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -758,12 +763,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (size == 1) { assert remapKeys == null || remapKeys.size() == 1; - singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown); + singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown, remap); } else { - Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes, + Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate( + topNodes, topVer, futId, + remap, remapKeys, mappingKnown); @@ -911,14 +918,17 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @param topNodes Cache nodes. * @param topVer Topology version. * @param futId Future ID. + * @param remap Remap flag. * @param remapKeys Keys to remap. * @return Mapping. * @throws Exception If failed. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes, + private Map<UUID, PrimaryRequestState> mapUpdate( + Collection<ClusterNode> topNodes, AffinityTopologyVersion topVer, Long futId, + boolean remap, @Nullable Collection<KeyCacheObject> remapKeys, boolean mappingKnown) throws Exception { Iterator<?> it = null; @@ -1044,6 +1054,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu mapped.addMapping(nodes); mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer); + + if (!remap && nearEnabled) + reserveNearCacheEntry(cacheKey, topVer); } return pendingMappings; @@ -1053,10 +1066,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @param topVer Topology version. * @param futId Future ID. * @param mappingKnown {@code True} if update mapping is known locally. + * @param remap Remap flag. * @return Request. * @throws Exception If failed. */ - private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown) + private PrimaryRequestState mapSingleUpdate( + AffinityTopologyVersion topVer, + Long futId, + boolean mappingKnown, + boolean remap) throws Exception { Object key = F.first(keys); @@ -1155,6 +1173,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu conflictExpireTime, conflictVer); + if (!remap && nearEnabled) + reserveNearCacheEntry(cacheKey, topVer); + return new PrimaryRequestState(req, nodes, true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 503c324..d29eb02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -17,20 +17,8 @@ package org.apache.ignite.internal.processors.cache.distributed.near; -import java.io.Externalizable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; @@ -41,12 +29,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -61,6 +46,18 @@ import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import java.io.Externalizable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -123,177 +120,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** - * @param req Update request. - * @param res Update response. - */ - public void processNearAtomicUpdateResponse( - GridNearAtomicAbstractUpdateRequest req, - GridNearAtomicUpdateResponse res - ) { - if (F.size(res.failedKeys()) == req.size()) - return; - - /* - * Choose value to be stored in near cache: first check key is not in failed and not in skipped list, - * then check if value was generated on primary node, if not then use value sent in request. - */ - - Collection<KeyCacheObject> failed = res.failedKeys(); - List<Integer> nearValsIdxs = res.nearValuesIndexes(); - List<Integer> skipped = res.skippedIndexes(); - - GridCacheVersion ver = res.nearVersion(); - - assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']'; - - int nearValIdx = 0; - - String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - - for (int i = 0; i < req.size(); i++) { - if (F.contains(skipped, i)) - continue; - - KeyCacheObject key = req.key(i); - - if (F.contains(failed, key)) - continue; - - if (ctx.affinity().partitionBelongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup. - GridCacheEntryEx entry = peekEx(key); - - if (entry != null && entry.markObsolete(ver)) - removeEntry(entry); - - continue; - } - - CacheObject val = null; - - if (F.contains(nearValsIdxs, i)) { - val = res.nearValue(nearValIdx); - - nearValIdx++; - } - else { - assert req.operation() != TRANSFORM; - - if (req.operation() != DELETE) - val = req.value(i); - } - - long ttl = res.nearTtl(i); - long expireTime = res.nearExpireTime(i); - - if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE) - expireTime = CU.toExpireTime(ttl); - - try { - processNearAtomicUpdateResponse(ver, - key, - val, - ttl, - expireTime, - req.keepBinary(), - req.nodeId(), - req.subjectId(), - taskName, - req.operation() == TRANSFORM); - } - catch (IgniteCheckedException e) { - res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e)); - } - } - } - - /** - * @param ver Version. - * @param key Key. - * @param val Value. - * @param ttl TTL. - * @param expireTime Expire time. - * @param nodeId Node ID. - * @param subjId Subject ID. - * @param taskName Task name. - * @param transformedValue {@code True} if transformed value. - * @throws IgniteCheckedException If failed. - */ - private void processNearAtomicUpdateResponse( - GridCacheVersion ver, - KeyCacheObject key, - @Nullable CacheObject val, - long ttl, - long expireTime, - boolean keepBinary, - UUID nodeId, - UUID subjId, - String taskName, - boolean transformedValue) throws IgniteCheckedException { - try { - while (true) { - GridCacheEntryEx entry = null; - - AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - - try { - entry = entryEx(key, topVer); - - GridCacheOperation op = val != null ? UPDATE : DELETE; - - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( - ver, - nodeId, - nodeId, - op, - val, - null, - /*write-through*/false, - /*read-through*/false, - /*retval*/false, - keepBinary, - /*expiry policy*/null, - /*event*/true, - /*metrics*/true, - /*primary*/false, - /*check version*/true, - topVer, - CU.empty0(), - DR_NONE, - ttl, - expireTime, - null, - false, - false, - subjId, - taskName, - null, - null, - null, - transformedValue); - - if (updRes.removeVersion() != null) - ctx.onDeferredDelete(entry, updRes.removeVersion()); - - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry while updating near cache value (will retry): " + key); - - entry = null; - } - finally { - if (entry != null) - entry.touch(topVer); - } - } - } - catch (GridDhtInvalidPartitionException ignored) { - // Ignore. - } - } - - /** * @param nodeId Sender node ID. * @param req Dht atomic update request. * @param res Dht atomic update response. http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index c953beb..f6059d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -717,7 +717,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { /** * @throws GridCacheEntryRemovedException If entry was removed. */ - void reserveEviction() throws GridCacheEntryRemovedException { + public void reserveEviction() throws GridCacheEntryRemovedException { lockEntry(); try { @@ -733,7 +733,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { /** * */ - void releaseEviction() { + public void releaseEviction() { lockEntry(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java index 19f98ff..462ca31 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java @@ -220,9 +220,6 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach * @throws Exception If failed. */ public void testPutConsistencyMultithreaded() throws Exception { - if (nearEnabled()) - fail("https://issues.apache.org/jira/browse/IGNITE-627"); - for (int i = 0; i < 20; i++) { log.info("Iteration: " + i); @@ -273,9 +270,6 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach * @throws Exception If failed. */ public void testPutRemoveConsistencyMultithreaded() throws Exception { - if (nearEnabled()) - fail("https://issues.apache.org/jira/browse/IGNITE-627"); - for (int i = 0; i < 10; i++) { log.info("Iteration: " + i); http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index 14c8571..1c6b8cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; @@ -876,6 +877,49 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testNearEntryUpdateRace() throws Exception { + ccfg = cacheConfiguration(1, FULL_SYNC); + + client = false; + + Ignite srv0 = startGrid(0); + + IgniteCache<Object, Object> srvCache = srv0.cache(TEST_CACHE); + + int key = 0; + + ccfg = null; + + client = true; + + Ignite client1 = startGrid(1); + + IgniteCache<Object, Object> nearCache = client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>()); + + testSpi(srv0).blockMessages(GridNearAtomicUpdateResponse.class, client1.name()); + + IgniteInternalFuture<?> nearPutFut = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + nearCache.put(key, 1); + } + }); + + testSpi(srv0).waitForBlocked(); + + srvCache.put(key, 2); + + assertFalse(nearPutFut.isDone()); + + testSpi(srv0).stopBlock(); + + nearPutFut.get(); + + assertEquals(2, nearCache.get(key)); + } + + /** * @param expData Expected cache data. */ private void checkData(Map<Integer, Integer> expData) {