Repository: ignite
Updated Branches:
  refs/heads/ignite-627 [created] 7aa249870


ignite-627


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7aa24987
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7aa24987
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7aa24987

Branch: refs/heads/ignite-627
Commit: 7aa249870ce7963cda4ad25fb2566b46a4fcf596
Parents: 797e7af
Author: sboikov <sboi...@apache.org>
Authored: Fri Oct 26 13:21:07 2018 +0300
Committer: sboikov <sboi...@apache.org>
Committed: Tue Oct 30 08:54:21 2018 +0300

----------------------------------------------------------------------
 .../GridNearAtomicAbstractUpdateFuture.java     | 293 ++++++++++++++++++-
 .../GridNearAtomicSingleUpdateFuture.java       |  34 ++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  69 +++--
 .../distributed/near/GridNearAtomicCache.java   | 198 +------------
 .../distributed/near/GridNearCacheEntry.java    |   4 +-
 ...idCacheValueConsistencyAbstractSelfTest.java |   6 -
 .../atomic/IgniteCacheAtomicProtocolTest.java   |  44 +++
 7 files changed, 410 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 983b18a..83d0bb7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -36,16 +36,25 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
 import 
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -55,7 +64,10 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 
 /**
  * Base for near atomic update futures.
@@ -142,6 +154,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
     /** Operation result. */
     protected GridCacheReturn opRes;
 
+    /** */
+    protected Map<KeyCacheObject, GridNearCacheEntry> reservedEntries;
+
     /**
      * Constructor.
      *
@@ -242,29 +257,40 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
      * Performs future mapping.
      */
     public final void map() {
+        map(false);
+    }
+
+    /**
+     * Performs future mapping.
+     *
+     * @param remap Remap flag.
+     */
+    protected final void map(boolean remap) {
         AffinityTopologyVersion topVer = 
cctx.shared().lockedTopologyVersion(null);
 
         if (topVer == null)
-            mapOnTopology();
+            mapOnTopology(remap);
         else {
             topLocked = true;
 
             // Cannot remap.
             remapCnt = 1;
 
-            map(topVer);
+            map(topVer, remap);
         }
     }
 
     /**
      * @param topVer Topology version.
+     * @param remap Remap flag.
      */
-    protected abstract void map(AffinityTopologyVersion topVer);
+    protected abstract void map(AffinityTopologyVersion topVer, boolean remap);
 
     /**
      * Maps future on ready topology.
+     * @param remap Remap flag.
      */
-    protected abstract void mapOnTopology();
+    protected abstract void mapOnTopology(boolean remap);
 
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
@@ -357,7 +383,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
         if (futId != null)
             cctx.mvcc().removeAtomicFuture(futId);
 
-        super.onDone(retval, err);
+        if (super.onDone(retval, err) && nearEnabled)
+            releaseNearCacheEntries();
     }
 
     /** {@inheritDoc} */
@@ -380,6 +407,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
             if (futId != null)
                 cctx.mvcc().removeAtomicFuture(futId);
 
+            if (nearEnabled)
+                releaseNearCacheEntries();
+
             return true;
         }
 
@@ -387,6 +417,25 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
     }
 
     /**
+     *
+     */
+    private void releaseNearCacheEntries() {
+        Map<KeyCacheObject, GridNearCacheEntry> reservedEntries0;
+
+        synchronized (this) {
+            if (reservedEntries == null|| reservedEntries.isEmpty())
+                return;
+
+            reservedEntries0 = reservedEntries;
+
+            reservedEntries = null;
+        }
+
+        for (GridNearCacheEntry entry : reservedEntries0.values())
+            entry.releaseEviction();
+    }
+
+    /**
      * @param req Request.
      * @param res Response.
      */
@@ -471,6 +520,240 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
     }
 
     /**
+     * @return Near cache.
+     */
+    protected final GridNearAtomicCache nearCache() {
+        return (GridNearAtomicCache)cctx.dht().near();
+    }
+
+    /**
+     * @param key Key,
+     * @param topVer Update topology version.
+     */
+    protected final void reserveNearCacheEntry(KeyCacheObject key, 
AffinityTopologyVersion topVer) {
+        assert nearEnabled;
+        assert reservedEntries != null;
+
+        if (cctx.affinityNode() && 
cctx.affinity().partitionBelongs(cctx.localNode(), 
cctx.affinity().partition(key), topVer))
+            return;
+
+        GridNearAtomicCache nearCache = nearCache();
+
+        synchronized (this) {
+            if (reservedEntries.containsKey(key))
+                return;
+
+            while (true) {
+                try {
+                    GridNearCacheEntry entry =  nearCache.entryExx(key, 
topVer);
+
+                    entry.reserveEviction();
+
+                    reservedEntries.put(key, entry);
+
+                    return;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry while reserving near 
cache entry (will retry): " + key);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param req Update request.
+     * @param res Update response.
+     */
+    protected final void processNearAtomicUpdateResponse(
+            GridNearAtomicAbstractUpdateRequest req,
+            GridNearAtomicUpdateResponse res
+    ) {
+        if (F.size(res.failedKeys()) == req.size())
+            return;
+
+        GridNearAtomicCache nearCache = nearCache();
+
+        /*
+         * Choose value to be stored in near cache: first check key is not in 
failed and not in skipped list,
+         * then check if value was generated on primary node, if not then use 
value sent in request.
+         */
+
+        Collection<KeyCacheObject> failed = res.failedKeys();
+        List<Integer> nearValsIdxs = res.nearValuesIndexes();
+        List<Integer> skipped = res.skippedIndexes();
+
+        GridCacheVersion ver = res.nearVersion();
+
+        assert ver != null : "Failed to find version [req=" + req + ", res=" + 
res + ']';
+
+        int nearValIdx = 0;
+
+        String taskName = 
cctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+
+        for (int i = 0; i < req.size(); i++) {
+            if (F.contains(skipped, i))
+                continue;
+
+            KeyCacheObject key = req.key(i);
+
+            if (F.contains(failed, key))
+                continue;
+
+            if (cctx.affinity().partitionBelongs(cctx.localNode(), 
cctx.affinity().partition(key), req.topologyVersion())) { // Reader became 
backup.
+                GridCacheEntryEx entry = nearCache.peekEx(key);
+
+                if (entry != null && entry.markObsolete(ver))
+                    nearCache.removeEntry(entry);
+
+                continue;
+            }
+
+            CacheObject val = null;
+
+            if (F.contains(nearValsIdxs, i)) {
+                val = res.nearValue(nearValIdx);
+
+                nearValIdx++;
+            }
+            else {
+                assert req.operation() != TRANSFORM;
+
+                if (req.operation() != DELETE)
+                    val = req.value(i);
+            }
+
+            long ttl = res.nearTtl(i);
+            long expireTime = res.nearExpireTime(i);
+
+            if (ttl != CU.TTL_NOT_CHANGED && expireTime == 
CU.EXPIRE_TIME_CALCULATE)
+                expireTime = CU.toExpireTime(ttl);
+
+            try {
+                processNearAtomicUpdateResponse(
+                    nearCache,
+                    topVer,
+                    ver,
+                    key,
+                    val,
+                    ttl,
+                    expireTime,
+                    req.keepBinary(),
+                    req.nodeId(),
+                    req.subjectId(),
+                    taskName,
+                    req.operation() == TRANSFORM);
+            }
+            catch (IgniteCheckedException e) {
+                res.addFailedKey(key, new IgniteCheckedException("Failed to 
update key in near cache: " + key, e));
+            }
+        }
+    }
+
+    /**
+     * @param nearCache Near cache.
+     * @param topVer Update topology version.
+     * @param ver Version.
+     * @param key Key.
+     * @param val Value.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     * @param keepBinary Keep binary flag.
+     * @param nodeId Node ID.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param transformedValue {@code True} if transformed value.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void processNearAtomicUpdateResponse(
+        GridNearAtomicCache nearCache,
+        AffinityTopologyVersion topVer,
+        GridCacheVersion ver,
+        KeyCacheObject key,
+        @Nullable CacheObject val,
+        long ttl,
+        long expireTime,
+        boolean keepBinary,
+        UUID nodeId,
+        UUID subjId,
+        String taskName,
+        boolean transformedValue) throws IgniteCheckedException {
+        try {
+            while (true) {
+                GridNearCacheEntry entry = null;
+
+                try {
+                    entry = nearCache.entryExx(key, topVer);
+
+                    GridCacheOperation op = val != null ? UPDATE : DELETE;
+
+                    GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+                            ver,
+                            nodeId,
+                            nodeId,
+                            op,
+                            val,
+                            null,
+                            /*write-through*/false,
+                            /*read-through*/false,
+                            /*retval*/false,
+                            keepBinary,
+                            /*expiry policy*/null,
+                            /*event*/true,
+                            /*metrics*/true,
+                            /*primary*/false,
+                            /*check version*/true,
+                            topVer,
+                            CU.empty0(),
+                            DR_NONE,
+                            ttl,
+                            expireTime,
+                            null,
+                            false,
+                            false,
+                            subjId,
+                            taskName,
+                            null,
+                            null,
+                            null,
+                            transformedValue);
+
+                    boolean release;
+
+                    synchronized (this) {
+                        GridNearCacheEntry reserved = 
reservedEntries.remove(key);
+
+                        assert reserved == null || reserved == entry;
+
+                        release = reserved != null;
+                    }
+
+                    if (release)
+                        entry.releaseEviction();
+
+                    if (updRes.removeVersion() != null)
+                        nearCache.context().onDeferredDelete(entry, 
updRes.removeVersion());
+
+                    break; // While.
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry while updating near cache 
value (will retry): " + key);
+
+                    entry = null;
+                }
+                finally {
+                    if (entry != null)
+                        entry.touch(topVer);
+                }
+            }
+        }
+        catch (GridDhtInvalidPartitionException ignored) {
+            // Ignore.
+        }
+    }
+
+    /**
      *
      */
     static class NodeResult {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 4c0d2db..0d12dc7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -17,12 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -39,7 +33,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -48,6 +42,13 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -124,6 +125,8 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
 
         this.key = key;
         this.val = val;
+
+        reservedEntries = new GridLeanMap<>(1);
     }
 
     /** {@inheritDoc} */
@@ -375,7 +378,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
             @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
                 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                     @Override public void run() {
-                        mapOnTopology();
+                        mapOnTopology(true);
                     }
                 });
             }
@@ -394,13 +397,11 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
         if (res.remapTopologyVersion() != null)
             return;
 
-        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
-        near.processNearAtomicUpdateResponse(req, res);
+        processNearAtomicUpdateResponse(req, res);
     }
 
     /** {@inheritDoc} */
-    @Override protected void mapOnTopology() {
+    @Override protected void mapOnTopology(boolean remap) {
         AffinityTopologyVersion topVer;
 
         if (cache.topology().stopping()) {
@@ -431,7 +432,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
                 @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                     cctx.kernalContext().closure().runLocalSafe(new Runnable() 
{
                         @Override public void run() {
-                            mapOnTopology();
+                            mapOnTopology(remap);
                         }
                     });
                 }
@@ -440,11 +441,11 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
             return;
         }
 
-        map(topVer);
+        map(topVer, remap);
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer) {
+    @Override protected void map(AffinityTopologyVersion topVer, boolean 
remap) {
         long futId = cctx.mvcc().nextAtomicId();
 
         Exception err = null;
@@ -479,6 +480,9 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
             return;
         }
 
+        if (!remap && nearEnabled)
+            reserveNearCacheEntry(reqState0.req.key(0), topVer);
+
         // Optimize mapping for single key.
         sendSingleRequest(reqState0.req.nodeId(), reqState0.req);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 28ebfb1..dc98854 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -17,15 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
@@ -43,7 +34,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -56,6 +47,16 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
@@ -162,6 +163,8 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
         this.vals = vals;
         this.conflictPutVals = conflictPutVals;
         this.conflictRmvVals = conflictRmvVals;
+
+        reservedEntries = U.newHashMap(keys.size());
     }
 
     /** {@inheritDoc} */
@@ -473,6 +476,9 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             completeFuture(opRes0, err0, res.futureId());
     }
 
+    /**
+     * @param remapTopVer New topology version.
+     */
     private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
         assert remapTopVer != null;
 
@@ -503,7 +509,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
                 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                     @Override public void run() {
-                        mapOnTopology();
+                        mapOnTopology(true);
                     }
                 });
             }
@@ -617,13 +623,11 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
         if (res.remapTopologyVersion() != null)
             return;
 
-        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
-        near.processNearAtomicUpdateResponse(req, res);
+        processNearAtomicUpdateResponse(req, res);
     }
 
     /** {@inheritDoc} */
-    @Override protected void mapOnTopology() {
+    @Override protected void mapOnTopology(boolean remap) {
         AffinityTopologyVersion topVer;
 
         if (cache.topology().stopping()) {
@@ -652,7 +656,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
                 @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                     cctx.kernalContext().closure().runLocalSafe(new Runnable() 
{
                         @Override public void run() {
-                            mapOnTopology();
+                            mapOnTopology(remap);
                         }
                     });
                 }
@@ -661,7 +665,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             return;
         }
 
-        map(topVer, remapKeys);
+        map(topVer, remap, remapKeys);
     }
 
     /**
@@ -725,15 +729,16 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer) {
-        map(topVer, null);
+    @Override protected void map(AffinityTopologyVersion topVer, boolean 
remap) {
+        map(topVer, remap, null);
     }
 
     /**
      * @param topVer Topology version.
+     * @param remap Remap flag.
      * @param remapKeys Keys to remap.
      */
-    private void map(AffinityTopologyVersion topVer, @Nullable 
Collection<KeyCacheObject> remapKeys) {
+    private void map(AffinityTopologyVersion topVer, boolean remap, @Nullable 
Collection<KeyCacheObject> remapKeys) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -758,12 +763,14 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             if (size == 1) {
                 assert remapKeys == null || remapKeys.size() == 1;
 
-                singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown);
+                singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown, 
remap);
             }
             else {
-                Map<UUID, PrimaryRequestState> pendingMappings = 
mapUpdate(topNodes,
+                Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(
+                    topNodes,
                     topVer,
                     futId,
+                    remap,
                     remapKeys,
                     mappingKnown);
 
@@ -911,14 +918,17 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
      * @param topNodes Cache nodes.
      * @param topVer Topology version.
      * @param futId Future ID.
+     * @param remap Remap flag.
      * @param remapKeys Keys to remap.
      * @return Mapping.
      * @throws Exception If failed.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> 
topNodes,
+    private Map<UUID, PrimaryRequestState> mapUpdate(
+        Collection<ClusterNode> topNodes,
         AffinityTopologyVersion topVer,
         Long futId,
+        boolean remap,
         @Nullable Collection<KeyCacheObject> remapKeys,
         boolean mappingKnown) throws Exception {
         Iterator<?> it = null;
@@ -1044,6 +1054,9 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
                 mapped.addMapping(nodes);
 
             mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, 
conflictExpireTime, conflictVer);
+
+            if (!remap && nearEnabled)
+                reserveNearCacheEntry(cacheKey, topVer);
         }
 
         return pendingMappings;
@@ -1053,10 +1066,15 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
      * @param topVer Topology version.
      * @param futId Future ID.
      * @param mappingKnown {@code True} if update mapping is known locally.
+     * @param remap Remap flag.
      * @return Request.
      * @throws Exception If failed.
      */
-    private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion 
topVer, Long futId, boolean mappingKnown)
+    private PrimaryRequestState mapSingleUpdate(
+        AffinityTopologyVersion topVer,
+        Long futId,
+        boolean mappingKnown,
+        boolean remap)
         throws Exception {
         Object key = F.first(keys);
 
@@ -1155,6 +1173,9 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             conflictExpireTime,
             conflictVer);
 
+        if (!remap && nearEnabled)
+            reserveNearCacheEntry(cacheKey, topVer);
+
         return new PrimaryRequestState(req, nodes, true);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 503c324..d29eb02 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -17,20 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
-import java.io.Externalizable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
@@ -41,12 +29,9 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -61,6 +46,18 @@ import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import java.io.Externalizable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -123,177 +120,6 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
     }
 
     /**
-     * @param req Update request.
-     * @param res Update response.
-     */
-    public void processNearAtomicUpdateResponse(
-        GridNearAtomicAbstractUpdateRequest req,
-        GridNearAtomicUpdateResponse res
-    ) {
-        if (F.size(res.failedKeys()) == req.size())
-            return;
-
-        /*
-         * Choose value to be stored in near cache: first check key is not in 
failed and not in skipped list,
-         * then check if value was generated on primary node, if not then use 
value sent in request.
-         */
-
-        Collection<KeyCacheObject> failed = res.failedKeys();
-        List<Integer> nearValsIdxs = res.nearValuesIndexes();
-        List<Integer> skipped = res.skippedIndexes();
-
-        GridCacheVersion ver = res.nearVersion();
-
-        assert ver != null : "Failed to find version [req=" + req + ", res=" + 
res + ']';
-
-        int nearValIdx = 0;
-
-        String taskName = 
ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
-
-        for (int i = 0; i < req.size(); i++) {
-            if (F.contains(skipped, i))
-                continue;
-
-            KeyCacheObject key = req.key(i);
-
-            if (F.contains(failed, key))
-                continue;
-
-            if (ctx.affinity().partitionBelongs(ctx.localNode(), 
ctx.affinity().partition(key), req.topologyVersion())) { // Reader became 
backup.
-                GridCacheEntryEx entry = peekEx(key);
-
-                if (entry != null && entry.markObsolete(ver))
-                    removeEntry(entry);
-
-                continue;
-            }
-
-            CacheObject val = null;
-
-            if (F.contains(nearValsIdxs, i)) {
-                val = res.nearValue(nearValIdx);
-
-                nearValIdx++;
-            }
-            else {
-                assert req.operation() != TRANSFORM;
-
-                if (req.operation() != DELETE)
-                    val = req.value(i);
-            }
-
-            long ttl = res.nearTtl(i);
-            long expireTime = res.nearExpireTime(i);
-
-            if (ttl != CU.TTL_NOT_CHANGED && expireTime == 
CU.EXPIRE_TIME_CALCULATE)
-                expireTime = CU.toExpireTime(ttl);
-
-            try {
-                processNearAtomicUpdateResponse(ver,
-                    key,
-                    val,
-                    ttl,
-                    expireTime,
-                    req.keepBinary(),
-                    req.nodeId(),
-                    req.subjectId(),
-                    taskName,
-                    req.operation() == TRANSFORM);
-            }
-            catch (IgniteCheckedException e) {
-                res.addFailedKey(key, new IgniteCheckedException("Failed to 
update key in near cache: " + key, e));
-            }
-        }
-    }
-
-    /**
-     * @param ver Version.
-     * @param key Key.
-     * @param val Value.
-     * @param ttl TTL.
-     * @param expireTime Expire time.
-     * @param nodeId Node ID.
-     * @param subjId Subject ID.
-     * @param taskName Task name.
-     * @param transformedValue {@code True} if transformed value.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void processNearAtomicUpdateResponse(
-        GridCacheVersion ver,
-        KeyCacheObject key,
-        @Nullable CacheObject val,
-        long ttl,
-        long expireTime,
-        boolean keepBinary,
-        UUID nodeId,
-        UUID subjId,
-        String taskName,
-        boolean transformedValue) throws IgniteCheckedException {
-        try {
-            while (true) {
-                GridCacheEntryEx entry = null;
-
-                AffinityTopologyVersion topVer = 
ctx.affinity().affinityTopologyVersion();
-
-                try {
-                    entry = entryEx(key, topVer);
-
-                    GridCacheOperation op = val != null ? UPDATE : DELETE;
-
-                    GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
-                        ver,
-                        nodeId,
-                        nodeId,
-                        op,
-                        val,
-                        null,
-                        /*write-through*/false,
-                        /*read-through*/false,
-                        /*retval*/false,
-                        keepBinary,
-                        /*expiry policy*/null,
-                        /*event*/true,
-                        /*metrics*/true,
-                        /*primary*/false,
-                        /*check version*/true,
-                        topVer,
-                        CU.empty0(),
-                        DR_NONE,
-                        ttl,
-                        expireTime,
-                        null,
-                        false,
-                        false,
-                        subjId,
-                        taskName,
-                        null,
-                        null,
-                        null,
-                        transformedValue);
-
-                    if (updRes.removeVersion() != null)
-                        ctx.onDeferredDelete(entry, updRes.removeVersion());
-
-                    break; // While.
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry while updating near cache 
value (will retry): " + key);
-
-                    entry = null;
-                }
-                finally {
-                    if (entry != null)
-                        entry.touch(topVer);
-                }
-            }
-        }
-        catch (GridDhtInvalidPartitionException ignored) {
-            // Ignore.
-        }
-    }
-
-    /**
      * @param nodeId Sender node ID.
      * @param req Dht atomic update request.
      * @param res Dht atomic update response.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index c953beb..f6059d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -717,7 +717,7 @@ public class GridNearCacheEntry extends 
GridDistributedCacheEntry {
     /**
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    void reserveEviction() throws GridCacheEntryRemovedException {
+    public void reserveEviction() throws GridCacheEntryRemovedException {
         lockEntry();
 
         try {
@@ -733,7 +733,7 @@ public class GridNearCacheEntry extends 
GridDistributedCacheEntry {
     /**
      *
      */
-    void releaseEviction() {
+    public void releaseEviction() {
         lockEntry();
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
index 19f98ff..462ca31 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
@@ -220,9 +220,6 @@ public abstract class 
GridCacheValueConsistencyAbstractSelfTest extends GridCach
      * @throws Exception If failed.
      */
     public void testPutConsistencyMultithreaded() throws Exception {
-        if (nearEnabled())
-            fail("https://issues.apache.org/jira/browse/IGNITE-627";);
-
         for (int i = 0; i < 20; i++) {
             log.info("Iteration: " + i);
 
@@ -273,9 +270,6 @@ public abstract class 
GridCacheValueConsistencyAbstractSelfTest extends GridCach
      * @throws Exception If failed.
      */
     public void testPutRemoveConsistencyMultithreaded() throws Exception {
-        if (nearEnabled())
-            fail("https://issues.apache.org/jira/browse/IGNITE-627";);
-
        for (int i = 0; i < 10; i++) {
            log.info("Iteration: " + i);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 14c8571..1c6b8cb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
@@ -876,6 +877,49 @@ public class IgniteCacheAtomicProtocolTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testNearEntryUpdateRace() throws Exception {
+        ccfg = cacheConfiguration(1, FULL_SYNC);
+
+        client = false;
+
+        Ignite srv0 = startGrid(0);
+
+        IgniteCache<Object, Object> srvCache = srv0.cache(TEST_CACHE);
+
+        int key = 0;
+
+        ccfg = null;
+
+        client = true;
+
+        Ignite client1 = startGrid(1);
+
+        IgniteCache<Object, Object> nearCache = 
client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>());
+
+        testSpi(srv0).blockMessages(GridNearAtomicUpdateResponse.class, 
client1.name());
+
+        IgniteInternalFuture<?> nearPutFut = GridTestUtils.runAsync(new 
Runnable() {
+            @Override public void run() {
+                nearCache.put(key, 1);
+            }
+        });
+
+        testSpi(srv0).waitForBlocked();
+
+        srvCache.put(key, 2);
+
+        assertFalse(nearPutFut.isDone());
+
+        testSpi(srv0).stopBlock();
+
+        nearPutFut.get();
+
+        assertEquals(2, nearCache.get(key));
+    }
+
+    /**
      * @param expData Expected cache data.
      */
     private void checkData(Map<Integer, Integer> expData) {

Reply via email to