Refactored DHT future.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6760696
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6760696
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6760696

Branch: refs/heads/ignite-2523-1-resp-dht
Commit: a6760696c106b7d62e6bedca6e80b96b579ef4d1
Parents: 9833dab
Author: vozerov-gridgain <[email protected]>
Authored: Thu Apr 28 15:56:04 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Thu Apr 28 15:56:04 2016 +0300

----------------------------------------------------------------------
 .../GridDhtAtomicAbstractUpdateFuture.java      | 492 +++++++++++++++++++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 393 ++-------------
 2 files changed, 521 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a6760696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
new file mode 100644
index 0000000..4e166b1
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.processor.EntryProcessor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Abstract DHT atomic update future.
+ */
+public abstract class GridDhtAtomicAbstractUpdateFuture extends 
GridFutureAdapter<Void>
+    implements GridCacheAtomicFuture<Void> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Logger reference. */
+    protected static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
+    /** Future version. */
+    protected final GridCacheVersion futVer;
+
+    /** Cache context. */
+    protected final GridCacheContext cctx;
+
+    /** Update request. */
+    protected final GridNearAtomicAbstractUpdateRequest updateReq;
+
+    /** Update response. */
+    protected final GridNearAtomicAbstractUpdateResponse updateRes;
+
+    /** Completion callback. */
+    @GridToStringExclude
+    protected final CI2<GridNearAtomicAbstractUpdateRequest, 
GridNearAtomicAbstractUpdateResponse> completionCb;
+
+    /** Write version. */
+    protected final GridCacheVersion writeVer;
+
+    /** */
+    protected final boolean waitForExchange;
+
+    /** Force transform backup flag. */
+    protected boolean forceTransformBackups;
+
+    /** Response count. */
+    protected volatile int resCnt;
+
+    /** Continuous query closures. */
+    // TODO: Optimize.
+    private Collection<CI1<Boolean>> cntQryClsrs;
+
+    /**
+     * Constructor.
+     *
+     * @param cctx Cache context.
+     * @param updateReq Near request.
+     * @param updateRes Near response.
+     * @param completionCb Completion callback.
+     * @param writeVer Write version.
+     */
+    protected GridDhtAtomicAbstractUpdateFuture(
+        GridCacheContext cctx,
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicAbstractUpdateResponse updateRes,
+        CI2<GridNearAtomicAbstractUpdateRequest, 
GridNearAtomicAbstractUpdateResponse> completionCb,
+        GridCacheVersion writeVer) {
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, 
GridDhtAtomicUpdateFuture.class);
+
+        futVer = cctx.versions().next(updateReq.topologyVersion());
+
+        this.cctx = cctx;
+        this.updateReq = updateReq;
+        this.updateRes = updateRes;
+        this.completionCb = completionCb;
+        this.writeVer = writeVer;
+
+        waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() 
&& !updateReq.clientRequest()));
+    }
+
+    /**
+     * @param entry Entry to map.
+     * @param val Value to write.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL (optional).
+     * @param conflictExpireTime Conflict expire time (optional).
+     * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} sends previous value to backups.
+     * @param prevVal Previous value.
+     * @param updateCntr Partition update counter.
+     */
+    public void addWriteEntry(GridDhtCacheEntry entry,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        @Nullable CacheObject prevVal,
+        long updateCntr) {
+        Collection<ClusterNode> dhtNodes = 
cctx.dht().topology().nodes(entry.partition(), updateReq.topologyVersion());
+
+        if (log.isDebugEnabled())
+            log.debug("Mapping entry to DHT nodes [nodes=" + 
U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
+
+        addKey(entry.key());
+
+        // TODO: Avoid iteration, we usually will have only one node here.
+        for (ClusterNode node : dhtNodes) {
+            UUID nodeId = node.id();
+
+            if (!nodeId.equals(cctx.localNodeId())) {
+                GridDhtAtomicUpdateRequest req = mapping(nodeId);
+
+                if (req == null) {
+                    req = new GridDhtAtomicUpdateRequest(
+                        cctx.cacheId(),
+                        nodeId,
+                        futVer,
+                        writeVer,
+                        updateReq.writeSynchronizationMode(),
+                        updateReq.topologyVersion(),
+                        forceTransformBackups,
+                        this.updateReq.subjectId(),
+                        this.updateReq.taskNameHash(),
+                        forceTransformBackups ? 
this.updateReq.invokeArguments() : null,
+                        cctx.deploymentEnabled(),
+                        this.updateReq.keepBinary());
+
+                    mapping(nodeId, req);
+                }
+
+                req.addWriteValue(entry.key(),
+                    val,
+                    entryProcessor,
+                    ttl,
+                    conflictExpireTime,
+                    conflictVer,
+                    addPrevVal,
+                    entry.partition(),
+                    prevVal,
+                    updateCntr);
+            }
+        }
+    }
+
+    /**
+     * @param readers Entry readers.
+     * @param entry Entry.
+     * @param val Value.
+     * @param entryProcessor Entry processor..
+     * @param ttl TTL for near cache update (optional).
+     * @param expireTime Expire time for near cache update (optional).
+     */
+    public void addNearWriteEntries(Iterable<UUID> readers,
+        GridDhtCacheEntry entry,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long expireTime) {
+        addKey(entry.key());
+
+        for (UUID nodeId : readers) {
+            GridDhtAtomicUpdateRequest req = mapping(nodeId);
+
+            if (req == null) {
+                ClusterNode node = cctx.discovery().node(nodeId);
+
+                // Node left the grid.
+                if (node == null)
+                    continue;
+
+                req = new GridDhtAtomicUpdateRequest(
+                    cctx.cacheId(),
+                    nodeId,
+                    futVer,
+                    writeVer,
+                    updateReq.writeSynchronizationMode(),
+                    updateReq.topologyVersion(),
+                    forceTransformBackups,
+                    updateReq.subjectId(),
+                    updateReq.taskNameHash(),
+                    forceTransformBackups ? updateReq.invokeArguments() : null,
+                    cctx.deploymentEnabled(),
+                    updateReq.keepBinary());
+
+                mapping(nodeId, req);
+            }
+
+            nearReaderEntry(entry.key(), entry);
+
+            req.addNearWriteValue(entry.key(),
+                val,
+                entryProcessor,
+                ttl,
+                expireTime);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futVer.asGridUuid();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return futVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Void> 
completeFuture(AffinityTopologyVersion topVer) {
+        if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 
0)
+            return this;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        if (log.isDebugEnabled())
+            log.debug("Processing node leave event [fut=" + this + ", nodeId=" 
+ nodeId + ']');
+
+        return registerResponse(nodeId);
+    }
+
+    /**
+     * Callback for backup update response.
+     *
+     * @param nodeId Backup node ID.
+     * @param updateRes Update response.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
+        if (log.isDebugEnabled())
+            log.debug("Received DHT atomic update future result [nodeId=" + 
nodeId + ", updateRes=" + updateRes + ']');
+
+        if (updateRes.error() != null) {
+            List<KeyCacheObject> failed = new 
ArrayList<>(updateRes.failedCount());
+
+            for (int i = 0; i < updateRes.failedCount(); i++)
+                failed.add(updateRes.failed(i));
+
+            this.updateRes.addFailedKeys(failed, updateRes.error());
+        }
+
+        for (int i = 0; i < updateRes.nearEvictedCount(); i++) {
+            KeyCacheObject key = updateRes.nearEvicted(i);
+
+            GridDhtCacheEntry entry = nearReaderEntry(key);
+
+            try {
+                entry.removeReader(nodeId, updateRes.messageId());
+            }
+            catch (GridCacheEntryRemovedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Entry with evicted reader was removed [entry=" 
+ entry + ", err=" + e + ']');
+            }
+        }
+
+        registerResponse(nodeId);
+    }
+
+    /**
+     * Deferred update response.
+     *
+     * @param nodeId Backup node ID.
+     */
+    public void onResult(UUID nodeId) {
+        if (log.isDebugEnabled())
+            log.debug("Received deferred DHT atomic update future result 
[nodeId=" + nodeId + ']');
+
+        registerResponse(nodeId);
+    }
+
+    /**
+     * Sends requests to remote nodes.
+     */
+    public void map() {
+        if (mappingsCount() > 0)
+            map0();
+        else
+            onDone();
+
+        // Send response right away if no ACKs from backup is required.
+        // Backups will send ACKs anyway, future will be completed after all 
backups have replied.
+        if (updateReq.writeSynchronizationMode() != FULL_SYNC)
+            completionCb.apply(updateReq, updateRes);
+    }
+
+    /**
+     * @param clsr Continuous query closure.
+     */
+    public void addContinuousQueryClosure(CI1<Boolean> clsr){
+        assert !isDone() : this;
+
+        if (cntQryClsrs == null)
+            cntQryClsrs = new ArrayList<>(10);
+
+        cntQryClsrs.add(clsr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable 
err) {
+        if (super.onDone(res, err)) {
+            cctx.mvcc().removeAtomicFuture(version());
+
+            boolean suc = err == null;
+
+            if (!suc)
+                markAllKeysFailed(err);
+
+            if (cntQryClsrs != null) {
+                for (CI1<Boolean> clsr : cntQryClsrs)
+                    clsr.apply(suc);
+            }
+
+            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
+                completionCb.apply(updateReq, updateRes);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Add key.
+     *
+     * @param key Key.
+     */
+    protected abstract void addKey(KeyCacheObject key);
+
+    /**
+     * Mark all request keys as failed.
+     *
+     * @param err Error.
+     */
+    protected abstract void markAllKeysFailed(@Nullable Throwable err);
+
+    /**
+     * Internal mapping routine.
+     */
+    protected abstract void map0();
+
+    /**
+     * Add mapping.
+     *
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    protected abstract void mapping(UUID nodeId, GridDhtAtomicUpdateRequest 
req);
+
+    /**
+     * Get mapping for the given node ID.
+     *
+     * @param nodeId Node ID.
+     * @return Mapping (if any).
+     */
+    @Nullable protected abstract GridDhtAtomicUpdateRequest mapping(UUID 
nodeId);
+
+    /**
+     * @return Mappings number.
+     */
+    protected abstract int mappingsCount();
+
+    /**
+     * Add near reader entry.
+     *
+     * @param key Key.
+     * @param entry Near reader entry.
+     */
+    protected abstract void nearReaderEntry(KeyCacheObject key, 
GridDhtCacheEntry entry);
+
+    /**
+     * Get near reader entry.
+     *
+     * @param key Key.
+     * @return Near reader entry.
+     */
+    protected abstract GridDhtCacheEntry nearReaderEntry(KeyCacheObject key);
+
+    /**
+     * Send DHT request.
+     *
+     * @param req Request.
+     */
+    protected void sendRequest(GridDhtAtomicUpdateRequest req) {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Sending DHT atomic update request [nodeId=" + 
req.nodeId() + ", req=" + req + ']');
+
+            cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException ignored) {
+            U.warn(log, "Failed to send update request to backup node because 
it left grid: " +
+                req.nodeId());
+
+            registerResponse(req.nodeId());
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send update request to backup node (did 
node leave the grid?): "
+                + req.nodeId(), e);
+
+            registerResponse(req.nodeId());
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if request found.
+     */
+    private boolean registerResponse(UUID nodeId) {
+        int resCnt0;
+
+        GridDhtAtomicUpdateRequest req = mapping(nodeId);
+
+        if (req != null) {
+            synchronized (this) {
+                if (req.onResponse()) {
+                    resCnt0 = resCnt;
+
+                    resCnt0 += 1;
+
+                    resCnt = resCnt0;
+                }
+                else
+                    return false;
+            }
+
+            if (resCnt0 == mappingsCount())
+                onDone();
+
+            return true;
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6760696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 0043bf1..bf118ab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -20,67 +20,25 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
 /**
  * DHT atomic cache backup update future.
  */
-public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
-    implements GridCacheAtomicFuture<Void> {
+public class GridDhtAtomicUpdateFuture extends 
GridDhtAtomicAbstractUpdateFuture {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
-
-    /** Logger. */
-    protected static IgniteLogger log;
-
-    /** Cache context. */
-    private final GridCacheContext cctx;
-
-    /** Future version. */
-    private final GridCacheVersion futVer;
-
-    /** Write version. */
-    private final GridCacheVersion writeVer;
-
-    /** Force transform backup flag. */
-    private boolean forceTransformBackups;
-
-    /** Completion callback. */
-    @GridToStringExclude
-    private final CI2<GridNearAtomicAbstractUpdateRequest, 
GridNearAtomicAbstractUpdateResponse> completionCb;
-
     /** Mappings. */
     @GridToStringInclude
     private final Map<UUID, GridDhtAtomicUpdateRequest> mappings;
@@ -88,24 +46,9 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
     /** Entries with readers. */
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
 
-    /** Update request. */
-    private final GridNearAtomicAbstractUpdateRequest updateReq;
-
-    /** Update response. */
-    private final GridNearAtomicAbstractUpdateResponse updateRes;
-
     /** Future keys. */
     private final Collection<KeyCacheObject> keys;
 
-    /** Continuous query closures. */
-    private Collection<CI1<Boolean>> cntQryClsrs;
-
-    /** */
-    private final boolean waitForExchange;
-
-    /** Response count. */
-    private volatile int resCnt;
-
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -120,339 +63,61 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicAbstractUpdateResponse updateRes
     ) {
-        this.cctx = cctx;
-        this.writeVer = writeVer;
-
-        futVer = cctx.versions().next(updateReq.topologyVersion());
-        this.updateReq = updateReq;
-        this.completionCb = completionCb;
-        this.updateRes = updateRes;
-
-        if (log == null)
-            log = U.logger(cctx.kernalContext(), logRef, 
GridDhtAtomicUpdateFuture.class);
+        super(cctx, updateReq, updateRes, completionCb, writeVer);
 
         keys = new ArrayList<>(updateReq.keysCount());
         mappings = U.newHashMap(updateReq.keysCount());
-
-        waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() 
&& !updateReq.clientRequest()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futVer.asGridUuid();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return futVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        if (log.isDebugEnabled())
-            log.debug("Processing node leave event [fut=" + this + ", nodeId=" 
+ nodeId + ']');
-
-        return registerResponse(nodeId);
     }
 
     /**
-     * @param nodeId Node ID.
-     * @return {@code True} if request found.
+     * Add key.
+     *
+     * @param key Key.
      */
-    private boolean registerResponse(UUID nodeId) {
-        int resCnt0;
-
-        GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
-
-        if (req != null) {
-            synchronized (this) {
-                if (req.onResponse()) {
-                    resCnt0 = resCnt;
-
-                    resCnt0 += 1;
-
-                    resCnt = resCnt0;
-                }
-                else
-                    return false;
-            }
-
-            if (resCnt0 == mappings.size())
-                onDone();
-
-            return true;
-        }
-
-        return false;
+    protected void addKey(KeyCacheObject key) {
+        keys.add(key);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return true;
+    @Override protected void markAllKeysFailed(@Nullable Throwable err) {
+        for (KeyCacheObject key : keys)
+            updateRes.addFailedKey(key, err);
     }
 
     /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        // No-op.
+    @Override protected void map0() {
+        for (GridDhtAtomicUpdateRequest req : mappings.values())
+            sendRequest(req);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> 
completeFuture(AffinityTopologyVersion topVer) {
-        if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 
0)
-            return this;
-
-        return null;
-    }
-
-    /**
-     * @param entry Entry to map.
-     * @param val Value to write.
-     * @param entryProcessor Entry processor.
-     * @param ttl TTL (optional).
-     * @param conflictExpireTime Conflict expire time (optional).
-     * @param conflictVer Conflict version (optional).
-     * @param addPrevVal If {@code true} sends previous value to backups.
-     * @param prevVal Previous value.
-     * @param updateCntr Partition update counter.
-     */
-    public void addWriteEntry(GridDhtCacheEntry entry,
-        @Nullable CacheObject val,
-        EntryProcessor<Object, Object, Object> entryProcessor,
-        long ttl,
-        long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean addPrevVal,
-        @Nullable CacheObject prevVal,
-        long updateCntr) {
-        AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
-        Collection<ClusterNode> dhtNodes = 
cctx.dht().topology().nodes(entry.partition(), topVer);
-
-        if (log.isDebugEnabled())
-            log.debug("Mapping entry to DHT nodes [nodes=" + 
U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
-
-        CacheWriteSynchronizationMode syncMode = 
updateReq.writeSynchronizationMode();
-
-        keys.add(entry.key());
-
-        for (ClusterNode node : dhtNodes) {
-            UUID nodeId = node.id();
-
-            if (!nodeId.equals(cctx.localNodeId())) {
-                GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
-
-                if (updateReq == null) {
-                    updateReq = new GridDhtAtomicUpdateRequest(
-                        cctx.cacheId(),
-                        nodeId,
-                        futVer,
-                        writeVer,
-                        syncMode,
-                        topVer,
-                        forceTransformBackups,
-                        this.updateReq.subjectId(),
-                        this.updateReq.taskNameHash(),
-                        forceTransformBackups ? 
this.updateReq.invokeArguments() : null,
-                        cctx.deploymentEnabled(),
-                        this.updateReq.keepBinary());
-
-                    mappings.put(nodeId, updateReq);
-                }
-
-                updateReq.addWriteValue(entry.key(),
-                    val,
-                    entryProcessor,
-                    ttl,
-                    conflictExpireTime,
-                    conflictVer,
-                    addPrevVal,
-                    entry.partition(),
-                    prevVal,
-                    updateCntr);
-            }
-        }
-    }
-
-    /**
-     * @param readers Entry readers.
-     * @param entry Entry.
-     * @param val Value.
-     * @param entryProcessor Entry processor..
-     * @param ttl TTL for near cache update (optional).
-     * @param expireTime Expire time for near cache update (optional).
-     */
-    public void addNearWriteEntries(Iterable<UUID> readers,
-        GridDhtCacheEntry entry,
-        @Nullable CacheObject val,
-        EntryProcessor<Object, Object, Object> entryProcessor,
-        long ttl,
-        long expireTime) {
-        CacheWriteSynchronizationMode syncMode = 
updateReq.writeSynchronizationMode();
-
-        keys.add(entry.key());
-
-        AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
-        for (UUID nodeId : readers) {
-            GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
-
-            if (updateReq == null) {
-                ClusterNode node = cctx.discovery().node(nodeId);
-
-                // Node left the grid.
-                if (node == null)
-                    continue;
-
-                updateReq = new GridDhtAtomicUpdateRequest(
-                    cctx.cacheId(),
-                    nodeId,
-                    futVer,
-                    writeVer,
-                    syncMode,
-                    topVer,
-                    forceTransformBackups,
-                    this.updateReq.subjectId(),
-                    this.updateReq.taskNameHash(),
-                    forceTransformBackups ? this.updateReq.invokeArguments() : 
null,
-                    cctx.deploymentEnabled(),
-                    this.updateReq.keepBinary());
-
-                mappings.put(nodeId, updateReq);
-            }
-
-            if (nearReadersEntries == null)
-                nearReadersEntries = new HashMap<>();
-
-            nearReadersEntries.put(entry.key(), entry);
-
-            updateReq.addNearWriteValue(entry.key(),
-                val,
-                entryProcessor,
-                ttl,
-                expireTime);
-        }
-    }
-
-    /**
-     * @param clsr Continuous query closure.
-     */
-    public void addContinuousQueryClosure(CI1<Boolean> clsr){
-        assert !isDone() : this;
-
-        if (cntQryClsrs == null)
-            cntQryClsrs = new ArrayList<>(10);
-
-        cntQryClsrs.add(clsr);
+    @Override protected void mapping(UUID nodeId, GridDhtAtomicUpdateRequest 
req) {
+        mappings.put(nodeId, req);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable 
err) {
-        if (super.onDone(res, err)) {
-            cctx.mvcc().removeAtomicFuture(version());
-
-            boolean suc = err == null;
-
-            if (!suc) {
-                for (KeyCacheObject key : keys)
-                    updateRes.addFailedKey(key, err);
-            }
-
-            if (cntQryClsrs != null) {
-                for (CI1<Boolean> clsr : cntQryClsrs)
-                    clsr.apply(suc);
-            }
-
-            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
-                completionCb.apply(updateReq, updateRes);
-
-            return true;
-        }
-
-        return false;
+    @Override @Nullable protected GridDhtAtomicUpdateRequest mapping(UUID 
nodeId) {
+        return mappings.get(nodeId);
     }
 
-    /**
-     * Sends requests to remote nodes.
-     */
-    public void map() {
-        if (!mappings.isEmpty()) {
-            for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Sending DHT atomic update request [nodeId=" 
+ req.nodeId() + ", req=" + req + ']');
-
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-                }
-                catch (ClusterTopologyCheckedException ignored) {
-                    U.warn(log, "Failed to send update request to backup node 
because it left grid: " +
-                        req.nodeId());
-
-                    registerResponse(req.nodeId());
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send update request to backup node 
(did node leave the grid?): "
-                        + req.nodeId(), e);
-
-                    registerResponse(req.nodeId());
-                }
-            }
-        }
-        else
-            onDone();
-
-        // Send response right away if no ACKs from backup is required.
-        // Backups will send ACKs anyway, future will be completed after all 
backups have replied.
-        if (updateReq.writeSynchronizationMode() != FULL_SYNC)
-            completionCb.apply(updateReq, updateRes);
+    /** {@inheritDoc} */
+    @Override protected int mappingsCount() {
+        return mappings != null ? mappings.size() : 0;
     }
 
-    /**
-     * Callback for backup update response.
-     *
-     * @param nodeId Backup node ID.
-     * @param updateRes Update response.
-     */
-    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
-        if (log.isDebugEnabled())
-            log.debug("Received DHT atomic update future result [nodeId=" + 
nodeId + ", updateRes=" + updateRes + ']');
-
-        if (updateRes.error() != null) {
-            List<KeyCacheObject> failed = new 
ArrayList<>(updateRes.failedCount());
-
-            for (int i = 0; i < updateRes.failedCount(); i++)
-                failed.add(updateRes.failed(i));
-
-            this.updateRes.addFailedKeys(failed, updateRes.error());
-        }
-
-        for (int i = 0; i < updateRes.nearEvictedCount(); i++) {
-            KeyCacheObject key = updateRes.nearEvicted(i);
-
-            GridDhtCacheEntry entry = nearReadersEntries.get(key);
-
-            try {
-                entry.removeReader(nodeId, updateRes.messageId());
-            }
-            catch (GridCacheEntryRemovedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Entry with evicted reader was removed [entry=" 
+ entry + ", err=" + e + ']');
-            }
-        }
+    /** {@inheritDoc} */
+    @Override protected void nearReaderEntry(KeyCacheObject key, 
GridDhtCacheEntry entry) {
+        if (nearReadersEntries == null)
+            nearReadersEntries = new HashMap<>();
 
-        registerResponse(nodeId);
+        nearReadersEntries.put(entry.key(), entry);
     }
 
-    /**
-     * Deferred update response.
-     *
-     * @param nodeId Backup node ID.
-     */
-    public void onResult(UUID nodeId) {
-        if (log.isDebugEnabled())
-            log.debug("Received deferred DHT atomic update future result 
[nodeId=" + nodeId + ']');
+    /** {@inheritDoc} */
+    @Override protected GridDhtCacheEntry nearReaderEntry(KeyCacheObject key) {
+        assert nearReadersEntries != null;
 
-        registerResponse(nodeId);
+        return nearReadersEntries.get(key);
     }
 
     /** {@inheritDoc} */

Reply via email to