Repository: ignite
Updated Branches:
  refs/heads/ignite-2523-remap-issue-2 [created] f8aabbf3a


Revert "IGNITE-2926: Implemented special version of single-update future for 
ATOMIC cache. Used for PRIMARY mode and only in case of a single key-value pair 
update. Gives 3% garbage reduction. Not much, though, the main goal for this 
change is not perofmance, but rather infrastructure for further improvements. 
Namely, for single-update request which will be initiated only form this 
future."

This reverts commit e833eb2d3d09112ddca994c17b73441df20524a3.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f8aabbf3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f8aabbf3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f8aabbf3

Branch: refs/heads/ignite-2523-remap-issue-2
Commit: f8aabbf3ab0a20d8d126b105986bc2c42b669a3d
Parents: 97cf2b3
Author: vozerov-gridgain <[email protected]>
Authored: Mon Apr 25 15:27:44 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Mon Apr 25 15:27:44 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheOperationFilter.java  |  61 --
 .../processors/cache/GridCacheAtomicFuture.java |   5 +
 .../dht/atomic/GridDhtAtomicCache.java          | 107 +--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   5 +
 .../GridNearAtomicAbstractUpdateFuture.java     | 244 -------
 .../GridNearAtomicSingleUpdateFuture.java       | 645 -------------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 319 +++++++--
 7 files changed, 301 insertions(+), 1085 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java
deleted file mode 100644
index 7fdfaac..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Cache operation filter.
- */
-public enum CacheOperationFilter {
-    /** Always pass. */
-    ALWAYS,
-
-    /** No value. */
-    NO_VAL,
-
-    /** Has value. */
-    HAS_VAL,
-
-    /** Equals to value. */
-    EQUALS_VAL;
-
-    /**
-     * Creare predicate from operation filter.
-     *
-     * @param val Optional value.
-     * @return Predicate.
-     */
-    @Nullable public CacheEntryPredicate createPredicate(@Nullable CacheObject 
val) {
-        switch (this) {
-            case ALWAYS:
-                return null;
-
-            case NO_VAL:
-                return new CacheEntryPredicateNoValue();
-
-            case HAS_VAL:
-                return new CacheEntryPredicateHasValue();
-
-            default:
-                assert this == EQUALS_VAL;
-
-                return new CacheEntryPredicateContainsValue(val);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index c96d00f..359909e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -38,4 +38,9 @@ public interface GridCacheAtomicFuture<R> extends 
GridCacheFuture<R> {
      * @return Future or {@code null} if no need to wait.
      */
     public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion 
topVer);
+
+    /**
+     * @return Future keys.
+     */
+    public Collection<?> keys();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d28aaaa..d8a0782 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -983,7 +983,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(SecurityPermission.CACHE_PUT);
 
-        final GridNearAtomicAbstractUpdateFuture updateFut =
+        final GridNearAtomicUpdateFuture updateFut =
             createSingleUpdateFuture(key, val, proc, invokeArgs, retval, 
filter, waitTopFut);
 
         return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1015,7 +1015,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
 
-        final GridNearAtomicAbstractUpdateFuture updateFut =
+        final GridNearAtomicUpdateFuture updateFut =
             createSingleUpdateFuture(key, null, null, null, retval, filter, 
true);
 
         if (statsEnabled)
@@ -1042,7 +1042,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param waitTopFut Whether to wait for topology future.
      * @return Future.
      */
-    private GridNearAtomicAbstractUpdateFuture createSingleUpdateFuture(
+    private GridNearAtomicUpdateFuture createSingleUpdateFuture(
         K key,
         @Nullable V val,
         @Nullable EntryProcessor proc,
@@ -1054,19 +1054,19 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         GridCacheOperation op;
-        Object val0;
+        Collection vals;
 
         if (val != null) {
             op = UPDATE;
-            val0 = val;
+            vals = Collections.singletonList(val);
         }
         else if (proc != null) {
             op = TRANSFORM;
-            val0 = proc;
+            vals = Collections.singletonList(proc);
         }
         else {
             op = DELETE;
-            val0 = null;
+            vals = null;
         }
 
         GridCacheDrInfo conflictPutVal = null;
@@ -1080,75 +1080,37 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             if (op == UPDATE) {
                 conflictPutVal = new GridCacheDrInfo(ctx.toCacheObject(val), 
ctx.versions().next(dcId));
 
-                val0 = null;
+                vals = null;
             }
             else if (op == GridCacheOperation.TRANSFORM) {
                 conflictPutVal = new GridCacheDrInfo(proc, 
ctx.versions().next(dcId));
 
-                val0 = null;
+                vals = null;
             }
             else
                 conflictRmvVer = ctx.versions().next(dcId);
         }
 
-        CacheEntryPredicate[] filters = CU.filterArray(filter);
-
-        if (conflictPutVal == null && conflictRmvVer == null && 
!isFastMap(filters, op)) {
-            return new GridNearAtomicSingleUpdateFuture(
-                ctx,
-                this,
-                ctx.config().getWriteSynchronizationMode(),
-                op,
-                key,
-                val0,
-                invokeArgs,
-                retval,
-                false,
-                opCtx != null ? opCtx.expiry() : null,
-                filters,
-                ctx.subjectIdPerCall(null, opCtx),
-                ctx.kernalContext().job().currentTaskNameHash(),
-                opCtx != null && opCtx.skipStore(),
-                opCtx != null && opCtx.isKeepBinary(),
-                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-                waitTopFut
-            );
-        }
-        else {
-            return new GridNearAtomicUpdateFuture(
-                ctx,
-                this,
-                ctx.config().getWriteSynchronizationMode(),
-                op,
-                Collections.singletonList(key),
-                val0 != null ? Collections.singletonList(val0) : null,
-                invokeArgs,
-                conflictPutVal != null ? Collections.singleton(conflictPutVal) 
: null,
-                conflictRmvVer != null ? Collections.singleton(conflictRmvVer) 
: null,
-                retval,
-                false,
-                opCtx != null ? opCtx.expiry() : null,
-                filters,
-                ctx.subjectIdPerCall(null, opCtx),
-                ctx.kernalContext().job().currentTaskNameHash(),
-                opCtx != null && opCtx.skipStore(),
-                opCtx != null && opCtx.isKeepBinary(),
-                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-                waitTopFut);
-        }
-    }
-
-    /**
-     * Whether this is fast-map operation.
-     *
-     * @param filters Filters.
-     * @param op Operation.
-     * @return {@code True} if fast-map.
-     */
-    public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation 
op) {
-        return F.isEmpty(filters) && op != TRANSFORM && 
ctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
-            ctx.config().getAtomicWriteOrderMode() == CLOCK &&
-            !(ctx.writeThrough() && ctx.config().getInterceptor() != null);
+        return new GridNearAtomicUpdateFuture(
+            ctx,
+            this,
+            ctx.config().getWriteSynchronizationMode(),
+            op,
+            Collections.singletonList(key),
+            vals,
+            invokeArgs,
+            conflictPutVal != null ? Collections.singleton(conflictPutVal) : 
null,
+            conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : 
null,
+            retval,
+            false,
+            opCtx != null ? opCtx.expiry() : null,
+            CU.filterArray(filter),
+            ctx.subjectIdPerCall(null, opCtx),
+            ctx.kernalContext().job().currentTaskNameHash(),
+            opCtx != null && opCtx.skipStore(),
+            opCtx != null && opCtx.isKeepBinary(),
+            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+            waitTopFut);
     }
 
     /**
@@ -2880,15 +2842,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         res.nodeId(ctx.localNodeId());
 
-        GridNearAtomicAbstractUpdateFuture fut =
-            
(GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+        GridNearAtomicUpdateFuture fut = 
(GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
-        if (fut != null) {
-            if (fut instanceof GridNearAtomicSingleUpdateFuture)
-                ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res, 
false);
-            else
-                ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false);
-        }
+        if (fut != null)
+            fut.onResult(nodeId, res, false);
         else
             U.warn(log, "Failed to find near update future for update response 
(will ignore) " +
                 "[nodeId=" + nodeId + ", res=" + res + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5760596..8e91272 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -205,6 +205,11 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
         return null;
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<KeyCacheObject> keys() {
+        return keys;
+    }
+
     /**
      * @param entry Entry to map.
      * @param val Value to write.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
deleted file mode 100644
index 7f52299..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import 
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-
-/**
- * Base for near atomic update futures.
- */
-public abstract class GridNearAtomicAbstractUpdateFuture extends 
GridFutureAdapter<Object>
-    implements GridCacheAtomicFuture<Object> {
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
-
-    /** Logger. */
-    protected static IgniteLogger log;
-
-    /** Cache context. */
-    protected final GridCacheContext cctx;
-
-    /** Cache. */
-    protected final GridDhtAtomicCache cache;
-
-    /** Write synchronization mode. */
-    protected final CacheWriteSynchronizationMode syncMode;
-
-    /** Update operation. */
-    protected final GridCacheOperation op;
-
-    /** Optional arguments for entry processor. */
-    protected final Object[] invokeArgs;
-
-    /** Return value require flag. */
-    protected final boolean retval;
-
-    /** Raw return value flag. */
-    protected final boolean rawRetval;
-
-    /** Expiry policy. */
-    protected final ExpiryPolicy expiryPlc;
-
-    /** Optional filter. */
-    protected final CacheEntryPredicate[] filter;
-
-    /** Subject ID. */
-    protected final UUID subjId;
-
-    /** Task name hash. */
-    protected final int taskNameHash;
-
-    /** Skip store flag. */
-    protected final boolean skipStore;
-
-    /** Keep binary flag. */
-    protected final boolean keepBinary;
-
-    /** Wait for topology future flag. */
-    protected final boolean waitTopFut;
-
-    /** Near cache flag. */
-    protected final boolean nearEnabled;
-
-    /** Mutex to synchronize state updates. */
-    protected final Object mux = new Object();
-
-    /** Topology locked flag. Set if atomic update is performed inside a TX or 
explicit lock. */
-    protected boolean topLocked;
-
-    /** Remap count. */
-    protected int remapCnt;
-
-    /** Current topology version. */
-    protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
-    /** */
-    protected GridCacheVersion updVer;
-
-    /** Topology version when got mapping error. */
-    protected AffinityTopologyVersion mapErrTopVer;
-
-    /** */
-    protected int resCnt;
-
-    /** Error. */
-    protected CachePartialUpdateCheckedException err;
-
-    /** Future ID. */
-    protected GridCacheVersion futVer;
-
-    /** Completion future for a particular topology version. */
-    protected GridFutureAdapter<Void> topCompleteFut;
-
-    /** Operation result. */
-    protected GridCacheReturn opRes;
-
-    /**
-     * Constructor.
-     *
-     * @param cctx Cache context.
-     * @param cache Cache.
-     * @param syncMode Synchronization mode.
-     * @param op Operation.
-     * @param invokeArgs Invoke arguments.
-     * @param retval Return value flag.
-     * @param rawRetval Raw return value flag.
-     * @param expiryPlc Expiry policy.
-     * @param filter Filter.
-     * @param subjId Subject ID.
-     * @param taskNameHash Task name hash.
-     * @param skipStore Skip store flag.
-     * @param keepBinary Keep binary flag.
-     * @param remapCnt Remap count.
-     * @param waitTopFut Wait topology future flag.
-     */
-    protected GridNearAtomicAbstractUpdateFuture(
-        GridCacheContext cctx,
-        GridDhtAtomicCache cache,
-        CacheWriteSynchronizationMode syncMode,
-        GridCacheOperation op,
-        @Nullable Object[] invokeArgs,
-        boolean retval,
-        boolean rawRetval,
-        @Nullable ExpiryPolicy expiryPlc,
-        CacheEntryPredicate[] filter,
-        UUID subjId,
-        int taskNameHash,
-        boolean skipStore,
-        boolean keepBinary,
-        int remapCnt,
-        boolean waitTopFut
-    ) {
-        if (log == null)
-            log = U.logger(cctx.kernalContext(), logRef, 
GridFutureAdapter.class);
-
-        this.cctx = cctx;
-        this.cache = cache;
-        this.syncMode = syncMode;
-        this.op = op;
-        this.invokeArgs = invokeArgs;
-        this.retval = retval;
-        this.rawRetval = rawRetval;
-        this.expiryPlc = expiryPlc;
-        this.filter = filter;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-        this.skipStore = skipStore;
-        this.keepBinary = keepBinary;
-        this.waitTopFut = waitTopFut;
-
-        nearEnabled = CU.isNearEnabled(cctx);
-
-        if (!waitTopFut)
-            remapCnt = 1;
-
-        this.remapCnt = remapCnt;
-    }
-
-    /**
-     * Performs future mapping.
-     */
-    public void map() {
-        AffinityTopologyVersion topVer = 
cctx.shared().lockedTopologyVersion(null);
-
-        if (topVer == null)
-            mapOnTopology();
-        else {
-            topLocked = true;
-
-            // Cannot remap.
-            remapCnt = 1;
-
-            map(topVer);
-        }
-    }
-
-    /**
-     * @param topVer Topology version.
-     */
-    protected abstract void map(AffinityTopologyVersion topVer);
-
-    /**
-     * Maps future on ready topology.
-     */
-    protected abstract void mapOnTopology();
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        // No-op.
-    }
-
-    /**
-     * @return {@code True} future is stored by {@link 
GridCacheMvccManager#addAtomicFuture}.
-     */
-    protected boolean storeFuture() {
-        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != 
FULL_ASYNC;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
deleted file mode 100644
index abfc5c9..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ /dev/null
@@ -1,645 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import 
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import 
org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-
-/**
- * DHT atomic cache near update future.
- */
-public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpdateFuture {
-    /** Keys */
-    private Object key;
-
-    /** Values. */
-    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private Object val;
-
-    /** Not null is operation is mapped to single node. */
-    private GridNearAtomicUpdateRequest req;
-
-    /**
-     * @param cctx Cache context.
-     * @param cache Cache instance.
-     * @param syncMode Write synchronization mode.
-     * @param op Update operation.
-     * @param key Keys to update.
-     * @param val Values or transform closure.
-     * @param invokeArgs Optional arguments for entry processor.
-     * @param retval Return value require flag.
-     * @param rawRetval {@code True} if should return {@code GridCacheReturn} 
as future result.
-     * @param expiryPlc Expiry policy explicitly specified for cache operation.
-     * @param filter Entry filter.
-     * @param subjId Subject ID.
-     * @param taskNameHash Task name hash code.
-     * @param skipStore Skip store flag.
-     * @param keepBinary Keep binary flag.
-     * @param remapCnt Maximum number of retries.
-     * @param waitTopFut If {@code false} does not wait for affinity change 
future.
-     */
-    public GridNearAtomicSingleUpdateFuture(
-        GridCacheContext cctx,
-        GridDhtAtomicCache cache,
-        CacheWriteSynchronizationMode syncMode,
-        GridCacheOperation op,
-        Object key,
-        @Nullable Object val,
-        @Nullable Object[] invokeArgs,
-        final boolean retval,
-        final boolean rawRetval,
-        @Nullable ExpiryPolicy expiryPlc,
-        final CacheEntryPredicate[] filter,
-        UUID subjId,
-        int taskNameHash,
-        boolean skipStore,
-        boolean keepBinary,
-        int remapCnt,
-        boolean waitTopFut
-    ) {
-        super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, 
expiryPlc, filter, subjId, taskNameHash,
-            skipStore, keepBinary, remapCnt, waitTopFut);
-
-        assert subjId != null;
-
-        this.key = key;
-        this.val = val;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        synchronized (mux) {
-            return futVer;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        GridNearAtomicUpdateResponse res = null;
-
-        synchronized (mux) {
-            GridNearAtomicUpdateRequest req = this.req != null && 
this.req.nodeId().equals(nodeId) ?
-                this.req : null;
-
-            if (req != null && req.response() == null) {
-                res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                    nodeId,
-                    req.futureVersion(),
-                    cctx.deploymentEnabled());
-
-                ClusterTopologyCheckedException e = new 
ClusterTopologyCheckedException("Primary node left grid " +
-                    "before response is received: " + nodeId);
-
-                
e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
-
-                res.addFailedKeys(req.keys(), e);
-            }
-        }
-
-        if (res != null)
-            onResult(nodeId, res, true);
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> 
completeFuture(AffinityTopologyVersion topVer) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override public boolean onDone(@Nullable Object res, @Nullable Throwable 
err) {
-        assert res == null || res instanceof GridCacheReturn;
-
-        GridCacheReturn ret = (GridCacheReturn)res;
-
-        Object retval =
-            res == null ? null : rawRetval ? ret : (this.retval || op == 
TRANSFORM) ?
-                cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : 
ret.success();
-
-        if (op == TRANSFORM && retval == null)
-            retval = Collections.emptyMap();
-
-        if (super.onDone(retval, err)) {
-            GridCacheVersion futVer = onFutureDone();
-
-            if (futVer != null)
-                cctx.mvcc().removeAtomicFuture(futVer);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Response callback.
-     *
-     * @param nodeId Node ID.
-     * @param res Update response.
-     * @param nodeErr {@code True} if response was created on node failure.
-     */
-    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, 
boolean nodeErr) {
-        GridNearAtomicUpdateRequest req;
-
-        AffinityTopologyVersion remapTopVer = null;
-
-        GridCacheReturn opRes0 = null;
-        CachePartialUpdateCheckedException err0 = null;
-
-        GridFutureAdapter<?> fut0 = null;
-
-        synchronized (mux) {
-            if (!res.futureVersion().equals(futVer))
-                return;
-
-            if (!this.req.nodeId().equals(nodeId))
-                return;
-
-            req = this.req;
-
-            this.req = null;
-
-            boolean remapKey = !F.isEmpty(res.remapKeys());
-
-            if (remapKey) {
-                if (mapErrTopVer == null || 
mapErrTopVer.compareTo(req.topologyVersion()) < 0)
-                    mapErrTopVer = req.topologyVersion();
-            }
-            else if (res.error() != null) {
-                if (res.failedKeys() != null) {
-                    if (err == null)
-                        err = new CachePartialUpdateCheckedException(
-                            "Failed to update keys (retry update if 
possible).");
-
-                    Collection<Object> keys = new 
ArrayList<>(res.failedKeys().size());
-
-                    for (KeyCacheObject key : res.failedKeys())
-                        
keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, 
false));
-
-                    err.add(keys, res.error(), req.topologyVersion());
-                }
-            }
-            else {
-                if (!req.fastMap() || req.hasPrimary()) {
-                    GridCacheReturn ret = res.returnValue();
-
-                    if (op == TRANSFORM) {
-                        if (ret != null) {
-                            assert ret.value() == null || ret.value() 
instanceof Map : ret.value();
-
-                            if (ret.value() != null) {
-                                if (opRes != null)
-                                    opRes.mergeEntryProcessResults(ret);
-                                else
-                                    opRes = ret;
-                            }
-                        }
-                    }
-                    else
-                        opRes = ret;
-                }
-            }
-
-            if (remapKey) {
-                assert mapErrTopVer != null;
-
-                remapTopVer = cctx.shared().exchange().topologyVersion();
-            }
-            else {
-                if (err != null &&
-                    X.hasCause(err, CachePartialUpdateCheckedException.class) 
&&
-                    X.hasCause(err, ClusterTopologyCheckedException.class) &&
-                    storeFuture() &&
-                    --remapCnt > 0) {
-                    ClusterTopologyCheckedException topErr =
-                        X.cause(err, ClusterTopologyCheckedException.class);
-
-                    if (!(topErr instanceof 
ClusterTopologyServerNotFoundException)) {
-                        CachePartialUpdateCheckedException cause =
-                            X.cause(err, 
CachePartialUpdateCheckedException.class);
-
-                        assert cause != null && cause.topologyVersion() != 
null : err;
-
-                        remapTopVer =
-                            new 
AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
-
-                        err = null;
-                        updVer = null;
-                    }
-                }
-            }
-
-            if (remapTopVer == null) {
-                err0 = err;
-                opRes0 = opRes;
-            }
-            else {
-                fut0 = topCompleteFut;
-
-                topCompleteFut = null;
-
-                cctx.mvcc().removeAtomicFuture(futVer);
-
-                futVer = null;
-                topVer = AffinityTopologyVersion.ZERO;
-            }
-        }
-
-        if (res.error() != null && res.failedKeys() == null) {
-            onDone(res.error());
-
-            return;
-        }
-
-        if (nearEnabled && !nodeErr)
-            updateNear(req, res);
-
-        if (remapTopVer != null) {
-            if (fut0 != null)
-                fut0.onDone();
-
-            if (!waitTopFut) {
-                onDone(new GridCacheTryPutFailedException());
-
-                return;
-            }
-
-            if (topLocked) {
-                CachePartialUpdateCheckedException e =
-                    new CachePartialUpdateCheckedException("Failed to update 
keys (retry update if possible).");
-
-                ClusterTopologyCheckedException cause = new 
ClusterTopologyCheckedException(
-                    "Failed to update keys, topology changed while execute 
atomic update inside transaction.");
-
-                
cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
-
-                e.add(Collections.singleton(cctx.toCacheKeyObject(key)), 
cause);
-
-                onDone(e);
-
-                return;
-            }
-
-            IgniteInternalFuture<AffinityTopologyVersion> fut =
-                cctx.shared().exchange().affinityReadyFuture(remapTopVer);
-
-            if (fut == null)
-                fut = new GridFinishedFuture<>(remapTopVer);
-
-            fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    cctx.kernalContext().closure().runLocalSafe(new Runnable() 
{
-                        @Override public void run() {
-                            try {
-                                AffinityTopologyVersion topVer = fut.get();
-
-                                map(topVer);
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
-                            }
-                        }
-                    });
-                }
-            });
-
-            return;
-        }
-
-        onDone(opRes0, err0);
-    }
-
-    /**
-     * Updates near cache.
-     *
-     * @param req Update request.
-     * @param res Update response.
-     */
-    private void updateNear(GridNearAtomicUpdateRequest req, 
GridNearAtomicUpdateResponse res) {
-        assert nearEnabled;
-
-        if (res.remapKeys() != null || !req.hasPrimary())
-            return;
-
-        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
-        near.processNearAtomicUpdateResponse(req, res);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void mapOnTopology() {
-        cache.topology().readLock();
-
-        AffinityTopologyVersion topVer = null;
-
-        try {
-            if (cache.topology().stopping()) {
-                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
-                    cache.name()));
-
-                return;
-            }
-
-            GridDhtTopologyFuture fut = 
cache.topology().topologyVersionFuture();
-
-            if (fut.isDone()) {
-                Throwable err = fut.validateCache(cctx);
-
-                if (err != null) {
-                    onDone(err);
-
-                    return;
-                }
-
-                topVer = fut.topologyVersion();
-            }
-            else {
-                if (waitTopFut) {
-                    assert !topLocked : this;
-
-                    fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                            cctx.kernalContext().closure().runLocalSafe(new 
Runnable() {
-                                @Override public void run() {
-                                    mapOnTopology();
-                                }
-                            });
-                        }
-                    });
-                }
-                else
-                    onDone(new GridCacheTryPutFailedException());
-
-                return;
-            }
-        }
-        finally {
-            cache.topology().readUnlock();
-        }
-
-        map(topVer);
-    }
-
-    /**
-     * Maps future to single node.
-     *
-     * @param nodeId Node ID.
-     * @param req Request.
-     */
-    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
-        if (cctx.localNodeId().equals(nodeId)) {
-            cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicUpdateRequest, 
GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest 
req, GridNearAtomicUpdateResponse res) {
-                        onResult(res.nodeId(), res, false);
-                    }
-                });
-        }
-        else {
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Sending near atomic update request [nodeId=" + 
req.nodeId() + ", req=" + req + ']');
-
-                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-
-                if (syncMode == FULL_ASYNC)
-                    onDone(new GridCacheReturn(cctx, true, true, null, true));
-            }
-            catch (IgniteCheckedException e) {
-                onSendError(req, e);
-            }
-        }
-    }
-
-    /**
-     * @param req Request.
-     * @param e Error.
-     */
-    void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException 
e) {
-        synchronized (mux) {
-            GridNearAtomicUpdateResponse res = new 
GridNearAtomicUpdateResponse(cctx.cacheId(),
-                req.nodeId(),
-                req.futureVersion(),
-                cctx.deploymentEnabled());
-
-            res.addFailedKeys(req.keys(), e);
-
-            onResult(req.nodeId(), res, true);
-        }
-    }
-
-    /** {@inheritDoc} */
-    protected void map(AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
-
-        if (F.isEmpty(topNodes)) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map 
keys for cache (all partition nodes " +
-                "left the grid)."));
-
-            return;
-        }
-
-        Exception err = null;
-        GridNearAtomicUpdateRequest singleReq0 = null;
-
-        GridCacheVersion futVer = cctx.versions().next(topVer);
-
-        GridCacheVersion updVer;
-
-        // Assign version on near node in CLOCK ordering mode even if fastMap 
is false.
-        if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
-            updVer = this.updVer;
-
-            if (updVer == null) {
-                updVer = cctx.versions().next(topVer);
-
-                if (log.isDebugEnabled())
-                    log.debug("Assigned fast-map version for update on near 
node: " + updVer);
-            }
-        }
-        else
-            updVer = null;
-
-        try {
-            singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
-
-            synchronized (mux) {
-                assert this.futVer == null : this;
-                assert this.topVer == AffinityTopologyVersion.ZERO : this;
-
-                this.topVer = topVer;
-                this.updVer = updVer;
-                this.futVer = futVer;
-
-                resCnt = 0;
-
-                req = singleReq0;
-            }
-        }
-        catch (Exception e) {
-            err = e;
-        }
-
-        if (err != null) {
-            onDone(err);
-
-            return;
-        }
-
-        if (storeFuture()) {
-            if (!cctx.mvcc().addAtomicFuture(futVer, this)) {
-                assert isDone() : this;
-
-                return;
-            }
-        }
-
-        // Optimize mapping for single key.
-        mapSingle(singleReq0.nodeId(), singleReq0);
-    }
-
-    /**
-     * @return Future version.
-     */
-    GridCacheVersion onFutureDone() {
-        GridCacheVersion ver0;
-
-        GridFutureAdapter<Void> fut0;
-
-        synchronized (mux) {
-            fut0 = topCompleteFut;
-
-            topCompleteFut = null;
-
-            ver0 = futVer;
-
-            futVer = null;
-        }
-
-        if (fut0 != null)
-            fut0.onDone();
-
-        return ver0;
-    }
-
-    /**
-     * @param topVer Topology version.
-     * @param futVer Future version.
-     * @param updVer Update version.
-     * @return Request.
-     * @throws Exception If failed.
-     */
-    private GridNearAtomicUpdateRequest 
mapSingleUpdate(AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
-        @Nullable GridCacheVersion updVer) throws Exception {
-        if (key == null)
-            throw new NullPointerException("Null key.");
-
-        Object val = this.val;
-
-        if (val == null && op != GridCacheOperation.DELETE)
-            throw new NullPointerException("Null value.");
-
-        KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
-
-        if (op != TRANSFORM)
-            val = cctx.toCacheObject(val);
-
-        ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
-
-        if (primary == null)
-            throw new ClusterTopologyServerNotFoundException("Failed to map 
keys for cache (all partition nodes " +
-                "left the grid).");
-
-        GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
-            cctx.cacheId(),
-            primary.id(),
-            futVer,
-            false,
-            updVer,
-            topVer,
-            topLocked,
-            syncMode,
-            op,
-            retval,
-            expiryPlc,
-            invokeArgs,
-            filter,
-            subjId,
-            taskNameHash,
-            skipStore,
-            keepBinary,
-            cctx.kernalContext().clientNode(),
-            cctx.deploymentEnabled(),
-            1);
-
-        req.addUpdateEntry(cacheKey,
-            val,
-            CU.TTL_NOT_CHANGED,
-            CU.EXPIRE_TIME_CALCULATE,
-            null,
-            true);
-
-        return req;
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        synchronized (mux) {
-            return S.toString(GridNearAtomicSingleUpdateFuture.class, this, 
super.toString());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index edebd8c..9955df7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -23,8 +23,10 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -34,7 +36,9 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import 
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import 
org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
@@ -53,19 +57,34 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
  * DHT atomic cache near update future.
  */
-public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFuture {
-    /** Fast map flag. */
-    private final boolean fastMap;
+public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
+    implements GridCacheAtomicFuture<Object>{
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
+    /** Cache context. */
+    private final GridCacheContext cctx;
+
+    /** Cache. */
+    private GridDhtAtomicCache cache;
+
+    /** Update operation. */
+    private final GridCacheOperation op;
 
     /** Keys */
     private Collection<?> keys;
@@ -74,6 +93,9 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<?> vals;
 
+    /** Optional arguments for entry processor. */
+    private Object[] invokeArgs;
+
     /** Conflict put values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<GridCacheDrInfo> conflictPutVals;
@@ -82,16 +104,85 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<GridCacheVersion> conflictRmvVals;
 
+    /** Return value require flag. */
+    private final boolean retval;
+
+    /** Expiry policy. */
+    private final ExpiryPolicy expiryPlc;
+
+    /** Optional filter. */
+    private final CacheEntryPredicate[] filter;
+
+    /** Write synchronization mode. */
+    private final CacheWriteSynchronizationMode syncMode;
+
+    /** Raw return value flag. */
+    private final boolean rawRetval;
+
+    /** Fast map flag. */
+    private final boolean fastMap;
+
+    /** Near cache flag. */
+    private final boolean nearEnabled;
+
+    /** Subject ID. */
+    private final UUID subjId;
+
+    /** Task name hash. */
+    private final int taskNameHash;
+
+    /** Topology locked flag. Set if atomic update is performed inside a TX or 
explicit lock. */
+    private boolean topLocked;
+
+    /** Skip store flag. */
+    private final boolean skipStore;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** Wait for topology future flag. */
+    private final boolean waitTopFut;
+
+    /** Remap count. */
+    private int remapCnt;
+
+    /** Mutex to synchronize state updates. */
+    private final Object mux = new Object();
+
+    /** Current topology version. */
+    private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+    /** */
+    private GridCacheVersion updVer;
+
+    /** Topology version when got mapping error. */
+    private AffinityTopologyVersion mapErrTopVer;
+
     /** Mappings if operations is mapped to more than one node. */
     @GridToStringInclude
     private Map<UUID, GridNearAtomicUpdateRequest> mappings;
 
+    /** */
+    private int resCnt;
+
+    /** Error. */
+    private CachePartialUpdateCheckedException err;
+
+    /** Future ID. */
+    private GridCacheVersion futVer;
+
+    /** Completion future for a particular topology version. */
+    private GridFutureAdapter<Void> topCompleteFut;
+
     /** Keys to remap. */
     private Collection<KeyCacheObject> remapKeys;
 
     /** Not null is operation is mapped to single node. */
     private GridNearAtomicUpdateRequest singleReq;
 
+    /** Operation result. */
+    private GridCacheReturn opRes;
+
     /**
      * @param cctx Cache context.
      * @param cache Cache instance.
@@ -134,20 +225,49 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
         int remapCnt,
         boolean waitTopFut
     ) {
-        super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, 
expiryPlc, filter, subjId, taskNameHash,
-            skipStore, keepBinary, remapCnt, waitTopFut);
+        this.rawRetval = rawRetval;
 
         assert vals == null || vals.size() == keys.size();
         assert conflictPutVals == null || conflictPutVals.size() == 
keys.size();
         assert conflictRmvVals == null || conflictRmvVals.size() == 
keys.size();
         assert subjId != null;
 
+        this.cctx = cctx;
+        this.cache = cache;
+        this.syncMode = syncMode;
+        this.op = op;
         this.keys = keys;
         this.vals = vals;
+        this.invokeArgs = invokeArgs;
         this.conflictPutVals = conflictPutVals;
         this.conflictRmvVals = conflictRmvVals;
+        this.retval = retval;
+        this.expiryPlc = expiryPlc;
+        this.filter = filter;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.skipStore = skipStore;
+        this.keepBinary = keepBinary;
+        this.waitTopFut = waitTopFut;
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, 
GridFutureAdapter.class);
 
-        fastMap = cache.isFastMap(filter, op);
+        fastMap = F.isEmpty(filter) && op != TRANSFORM && 
cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+            cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+            !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+        nearEnabled = CU.isNearEnabled(cctx);
+
+        if (!waitTopFut)
+            remapCnt = 1;
+
+        this.remapCnt = remapCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
@@ -157,6 +277,19 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
         }
     }
 
+    /**
+     * @return {@code True} if this future should block partition map exchange.
+     */
+    private boolean waitForPartitionExchange() {
+        // Wait fast-map near atomic update futures in CLOCK mode.
+        return fastMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<?> keys() {
+        return keys;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         GridNearAtomicUpdateResponse res = null;
@@ -191,21 +324,37 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> 
completeFuture(AffinityTopologyVersion topVer) {
-        // Wait fast-map near atomic update futures in CLOCK mode.
-        if (fastMap) {
-            GridFutureAdapter<Void> fut;
+    @Override public boolean trackable() {
+        return true;
+    }
 
-            synchronized (mux) {
-                if (this.topVer != AffinityTopologyVersion.ZERO && 
this.topVer.compareTo(topVer) < 0) {
-                    if (topCompleteFut == null)
-                        topCompleteFut = new GridFutureAdapter<>();
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
 
-                    fut = topCompleteFut;
-                }
-                else
-                    fut = null;
-            }
+    /**
+     * Performs future mapping.
+     */
+    public void map() {
+        AffinityTopologyVersion topVer = 
cctx.shared().lockedTopologyVersion(null);
+
+        if (topVer == null)
+            mapOnTopology();
+        else {
+            topLocked = true;
+
+            // Cannot remap.
+            remapCnt = 1;
+
+            map(topVer, null);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Void> 
completeFuture(AffinityTopologyVersion topVer) {
+        if (waitForPartitionExchange()) {
+            GridFutureAdapter<Void> fut = completeFuture0(topVer);
 
             if (fut != null && isDone()) {
                 fut.onDone();
@@ -305,34 +454,16 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
                     mapErrTopVer = req.topologyVersion();
             }
             else if (res.error() != null) {
-                if (res.failedKeys() != null) {
-                    if (err == null)
-                        err = new CachePartialUpdateCheckedException(
-                            "Failed to update keys (retry update if 
possible).");
-
-                    Collection<Object> keys = new 
ArrayList<>(res.failedKeys().size());
-
-                    for (KeyCacheObject key : res.failedKeys())
-                        
keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, 
false));
-
-                    err.add(keys, res.error(), req.topologyVersion());
-                }
+                if (res.failedKeys() != null)
+                    addFailedKeys(res.failedKeys(), req.topologyVersion(), 
res.error());
             }
             else {
                 if (!req.fastMap() || req.hasPrimary()) {
                     GridCacheReturn ret = res.returnValue();
 
                     if (op == TRANSFORM) {
-                        if (ret != null) {
-                            assert ret.value() == null || ret.value() 
instanceof Map : ret.value();
-
-                            if (ret.value() != null) {
-                                if (opRes != null)
-                                    opRes.mergeEntryProcessResults(ret);
-                                else
-                                    opRes = ret;
-                            }
-                        }
+                        if (ret != null)
+                            addInvokeResults(ret);
                     }
                     else
                         opRes = ret;
@@ -489,8 +620,10 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
         near.processNearAtomicUpdateResponse(req, res);
     }
 
-    /** {@inheritDoc} */
-    @Override protected void mapOnTopology() {
+    /**
+     * Maps future on ready topology.
+     */
+    private void mapOnTopology() {
         cache.topology().readLock();
 
         AffinityTopologyVersion topVer = null;
@@ -544,6 +677,35 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
     }
 
     /**
+     * @return {@code True} future is stored by {@link 
GridCacheMvccManager#addAtomicFuture}.
+     */
+    private boolean storeFuture() {
+        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != 
FULL_ASYNC;
+    }
+
+    /**
+     * Maps key to nodes. If filters are absent and operation is not 
TRANSFORM, then we can assign version on near
+     * node and send updates in parallel to all participating nodes.
+     *
+     * @param key Key to map.
+     * @param topVer Topology version to map.
+     * @param fastMap Flag indicating whether mapping is performed for 
fast-circuit update.
+     * @return Collection of nodes to which key is mapped.
+     */
+    private Collection<ClusterNode> mapKey(
+        KeyCacheObject key,
+        AffinityTopologyVersion topVer,
+        boolean fastMap
+    ) {
+        GridCacheAffinityManager affMgr = cctx.affinity();
+
+        // If we can send updates in parallel - do it.
+        return fastMap ?
+            cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primary(key, topVer));
+    }
+
+    /**
      * Maps future to single node.
      *
      * @param nodeId Node ID.
@@ -635,11 +797,6 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
         }
     }
 
-    /** {@inheritDoc} */
-    protected void map(AffinityTopologyVersion topVer) {
-        map(topVer, null);
-    }
-
     /**
      * @param topVer Topology version.
      * @param remapKeys Keys to remap.
@@ -757,6 +914,26 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
     }
 
     /**
+     * @param topVer Topology version.
+     * @return Future.
+     */
+    @Nullable GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion 
topVer) {
+        synchronized (mux) {
+            if (this.topVer == AffinityTopologyVersion.ZERO)
+                return null;
+
+            if (this.topVer.compareTo(topVer) < 0) {
+                if (topCompleteFut == null)
+                    topCompleteFut = new GridFutureAdapter<>();
+
+                return topCompleteFut;
+            }
+
+            return null;
+        }
+    }
+
+    /**
      * @return Future version.
      */
     GridCacheVersion onFutureDone() {
@@ -862,7 +1039,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             if (op != TRANSFORM)
                 val = cctx.toCacheObject(val);
 
-            Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer);
+            Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, 
fastMap);
 
             if (affNodes.isEmpty())
                 throw new ClusterTopologyServerNotFoundException("Failed to 
map keys for cache " +
@@ -1013,19 +1190,41 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
     }
 
     /**
-     * Maps key to nodes. If filters are absent and operation is not 
TRANSFORM, then we can assign version on near
-     * node and send updates in parallel to all participating nodes.
-     *
-     * @param key Key to map.
-     * @param topVer Topology version to map.
-     * @return Collection of nodes to which key is mapped.
+     * @param ret Result from single node.
      */
-    private Collection<ClusterNode> mapKey(KeyCacheObject key, 
AffinityTopologyVersion topVer) {
-        GridCacheAffinityManager affMgr = cctx.affinity();
+    @SuppressWarnings("unchecked")
+    private void addInvokeResults(GridCacheReturn ret) {
+        assert op == TRANSFORM : op;
+        assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+        if (ret.value() != null) {
+            if (opRes != null)
+                opRes.mergeEntryProcessResults(ret);
+            else
+                opRes = ret;
+        }
+    }
 
-        // If we can send updates in parallel - do it.
-        return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) :
-            Collections.singletonList(affMgr.primary(key, topVer));
+    /**
+     * @param failedKeys Failed keys.
+     * @param topVer Topology version for failed update.
+     * @param err Error cause.
+     */
+    private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+        AffinityTopologyVersion topVer,
+        Throwable err) {
+        CachePartialUpdateCheckedException err0 = this.err;
+
+        if (err0 == null)
+            err0 = this.err =
+                new CachePartialUpdateCheckedException("Failed to update keys 
(retry update if possible).");
+
+        Collection<Object> keys = new ArrayList<>(failedKeys.size());
+
+        for (KeyCacheObject key : failedKeys)
+            keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, 
keepBinary, false));
+
+        err0.add(keys, err, topVer);
     }
 
     /** {@inheritDoc} */

Reply via email to