Refactored DHT future.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6760696 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6760696 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6760696 Branch: refs/heads/ignite-2523-1-resp-dht Commit: a6760696c106b7d62e6bedca6e80b96b579ef4d1 Parents: 9833dab Author: vozerov-gridgain <[email protected]> Authored: Thu Apr 28 15:56:04 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Apr 28 15:56:04 2016 +0300 ---------------------------------------------------------------------- .../GridDhtAtomicAbstractUpdateFuture.java | 492 +++++++++++++++++++ .../dht/atomic/GridDhtAtomicUpdateFuture.java | 393 ++------------- 2 files changed, 521 insertions(+), 364 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a6760696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java new file mode 100644 index 0000000..4e166b1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import javax.cache.processor.EntryProcessor; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Abstract DHT atomic update future. + */ +public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapter<Void> + implements GridCacheAtomicFuture<Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** Logger reference. */ + protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Future version. */ + protected final GridCacheVersion futVer; + + /** Cache context. */ + protected final GridCacheContext cctx; + + /** Update request. */ + protected final GridNearAtomicAbstractUpdateRequest updateReq; + + /** Update response. */ + protected final GridNearAtomicAbstractUpdateResponse updateRes; + + /** Completion callback. */ + @GridToStringExclude + protected final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse> completionCb; + + /** Write version. */ + protected final GridCacheVersion writeVer; + + /** */ + protected final boolean waitForExchange; + + /** Force transform backup flag. */ + protected boolean forceTransformBackups; + + /** Response count. */ + protected volatile int resCnt; + + /** Continuous query closures. */ + // TODO: Optimize. + private Collection<CI1<Boolean>> cntQryClsrs; + + /** + * Constructor. + * + * @param cctx Cache context. + * @param updateReq Near request. + * @param updateRes Near response. + * @param completionCb Completion callback. + * @param writeVer Write version. + */ + protected GridDhtAtomicAbstractUpdateFuture( + GridCacheContext cctx, + GridNearAtomicAbstractUpdateRequest updateReq, + GridNearAtomicAbstractUpdateResponse updateRes, + CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse> completionCb, + GridCacheVersion writeVer) { + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); + + futVer = cctx.versions().next(updateReq.topologyVersion()); + + this.cctx = cctx; + this.updateReq = updateReq; + this.updateRes = updateRes; + this.completionCb = completionCb; + this.writeVer = writeVer; + + waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); + } + + /** + * @param entry Entry to map. + * @param val Value to write. + * @param entryProcessor Entry processor. + * @param ttl TTL (optional). + * @param conflictExpireTime Conflict expire time (optional). + * @param conflictVer Conflict version (optional). + * @param addPrevVal If {@code true} sends previous value to backups. + * @param prevVal Previous value. + * @param updateCntr Partition update counter. + */ + public void addWriteEntry(GridDhtCacheEntry entry, + @Nullable CacheObject val, + EntryProcessor<Object, Object, Object> entryProcessor, + long ttl, + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + @Nullable CacheObject prevVal, + long updateCntr) { + Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), updateReq.topologyVersion()); + + if (log.isDebugEnabled()) + log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']'); + + addKey(entry.key()); + + // TODO: Avoid iteration, we usually will have only one node here. + for (ClusterNode node : dhtNodes) { + UUID nodeId = node.id(); + + if (!nodeId.equals(cctx.localNodeId())) { + GridDhtAtomicUpdateRequest req = mapping(nodeId); + + if (req == null) { + req = new GridDhtAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + writeVer, + updateReq.writeSynchronizationMode(), + updateReq.topologyVersion(), + forceTransformBackups, + this.updateReq.subjectId(), + this.updateReq.taskNameHash(), + forceTransformBackups ? this.updateReq.invokeArguments() : null, + cctx.deploymentEnabled(), + this.updateReq.keepBinary()); + + mapping(nodeId, req); + } + + req.addWriteValue(entry.key(), + val, + entryProcessor, + ttl, + conflictExpireTime, + conflictVer, + addPrevVal, + entry.partition(), + prevVal, + updateCntr); + } + } + } + + /** + * @param readers Entry readers. + * @param entry Entry. + * @param val Value. + * @param entryProcessor Entry processor.. + * @param ttl TTL for near cache update (optional). + * @param expireTime Expire time for near cache update (optional). + */ + public void addNearWriteEntries(Iterable<UUID> readers, + GridDhtCacheEntry entry, + @Nullable CacheObject val, + EntryProcessor<Object, Object, Object> entryProcessor, + long ttl, + long expireTime) { + addKey(entry.key()); + + for (UUID nodeId : readers) { + GridDhtAtomicUpdateRequest req = mapping(nodeId); + + if (req == null) { + ClusterNode node = cctx.discovery().node(nodeId); + + // Node left the grid. + if (node == null) + continue; + + req = new GridDhtAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + writeVer, + updateReq.writeSynchronizationMode(), + updateReq.topologyVersion(), + forceTransformBackups, + updateReq.subjectId(), + updateReq.taskNameHash(), + forceTransformBackups ? updateReq.invokeArguments() : null, + cctx.deploymentEnabled(), + updateReq.keepBinary()); + + mapping(nodeId, req); + } + + nearReaderEntry(entry.key(), entry); + + req.addNearWriteValue(entry.key(), + val, + entryProcessor, + ttl, + expireTime); + } + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + return futVer.asGridUuid(); + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return futVer; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return true; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0) + return this; + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + if (log.isDebugEnabled()) + log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); + + return registerResponse(nodeId); + } + + /** + * Callback for backup update response. + * + * @param nodeId Backup node ID. + * @param updateRes Update response. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) { + if (log.isDebugEnabled()) + log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']'); + + if (updateRes.error() != null) { + List<KeyCacheObject> failed = new ArrayList<>(updateRes.failedCount()); + + for (int i = 0; i < updateRes.failedCount(); i++) + failed.add(updateRes.failed(i)); + + this.updateRes.addFailedKeys(failed, updateRes.error()); + } + + for (int i = 0; i < updateRes.nearEvictedCount(); i++) { + KeyCacheObject key = updateRes.nearEvicted(i); + + GridDhtCacheEntry entry = nearReaderEntry(key); + + try { + entry.removeReader(nodeId, updateRes.messageId()); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Entry with evicted reader was removed [entry=" + entry + ", err=" + e + ']'); + } + } + + registerResponse(nodeId); + } + + /** + * Deferred update response. + * + * @param nodeId Backup node ID. + */ + public void onResult(UUID nodeId) { + if (log.isDebugEnabled()) + log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']'); + + registerResponse(nodeId); + } + + /** + * Sends requests to remote nodes. + */ + public void map() { + if (mappingsCount() > 0) + map0(); + else + onDone(); + + // Send response right away if no ACKs from backup is required. + // Backups will send ACKs anyway, future will be completed after all backups have replied. + if (updateReq.writeSynchronizationMode() != FULL_SYNC) + completionCb.apply(updateReq, updateRes); + } + + /** + * @param clsr Continuous query closure. + */ + public void addContinuousQueryClosure(CI1<Boolean> clsr){ + assert !isDone() : this; + + if (cntQryClsrs == null) + cntQryClsrs = new ArrayList<>(10); + + cntQryClsrs.add(clsr); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + cctx.mvcc().removeAtomicFuture(version()); + + boolean suc = err == null; + + if (!suc) + markAllKeysFailed(err); + + if (cntQryClsrs != null) { + for (CI1<Boolean> clsr : cntQryClsrs) + clsr.apply(suc); + } + + if (updateReq.writeSynchronizationMode() == FULL_SYNC) + completionCb.apply(updateReq, updateRes); + + return true; + } + + return false; + } + + /** + * Add key. + * + * @param key Key. + */ + protected abstract void addKey(KeyCacheObject key); + + /** + * Mark all request keys as failed. + * + * @param err Error. + */ + protected abstract void markAllKeysFailed(@Nullable Throwable err); + + /** + * Internal mapping routine. + */ + protected abstract void map0(); + + /** + * Add mapping. + * + * @param nodeId Node ID. + * @param req Request. + */ + protected abstract void mapping(UUID nodeId, GridDhtAtomicUpdateRequest req); + + /** + * Get mapping for the given node ID. + * + * @param nodeId Node ID. + * @return Mapping (if any). + */ + @Nullable protected abstract GridDhtAtomicUpdateRequest mapping(UUID nodeId); + + /** + * @return Mappings number. + */ + protected abstract int mappingsCount(); + + /** + * Add near reader entry. + * + * @param key Key. + * @param entry Near reader entry. + */ + protected abstract void nearReaderEntry(KeyCacheObject key, GridDhtCacheEntry entry); + + /** + * Get near reader entry. + * + * @param key Key. + * @return Near reader entry. + */ + protected abstract GridDhtCacheEntry nearReaderEntry(KeyCacheObject key); + + /** + * Send DHT request. + * + * @param req Request. + */ + protected void sendRequest(GridDhtAtomicUpdateRequest req) { + try { + if (log.isDebugEnabled()) + log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ignored) { + U.warn(log, "Failed to send update request to backup node because it left grid: " + + req.nodeId()); + + registerResponse(req.nodeId()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send update request to backup node (did node leave the grid?): " + + req.nodeId(), e); + + registerResponse(req.nodeId()); + } + } + + /** + * @param nodeId Node ID. + * @return {@code True} if request found. + */ + private boolean registerResponse(UUID nodeId) { + int resCnt0; + + GridDhtAtomicUpdateRequest req = mapping(nodeId); + + if (req != null) { + synchronized (this) { + if (req.onResponse()) { + resCnt0 = resCnt; + + resCnt0 += 1; + + resCnt = resCnt0; + } + else + return false; + } + + if (resCnt0 == mappingsCount()) + onDone(); + + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a6760696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 0043bf1..bf118ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -20,67 +20,25 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; -import javax.cache.processor.EntryProcessor; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; - /** * DHT atomic cache backup update future. */ -public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> - implements GridCacheAtomicFuture<Void> { +public class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { /** */ private static final long serialVersionUID = 0L; - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Logger. */ - protected static IgniteLogger log; - - /** Cache context. */ - private final GridCacheContext cctx; - - /** Future version. */ - private final GridCacheVersion futVer; - - /** Write version. */ - private final GridCacheVersion writeVer; - - /** Force transform backup flag. */ - private boolean forceTransformBackups; - - /** Completion callback. */ - @GridToStringExclude - private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse> completionCb; - /** Mappings. */ @GridToStringInclude private final Map<UUID, GridDhtAtomicUpdateRequest> mappings; @@ -88,24 +46,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Entries with readers. */ private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries; - /** Update request. */ - private final GridNearAtomicAbstractUpdateRequest updateReq; - - /** Update response. */ - private final GridNearAtomicAbstractUpdateResponse updateRes; - /** Future keys. */ private final Collection<KeyCacheObject> keys; - /** Continuous query closures. */ - private Collection<CI1<Boolean>> cntQryClsrs; - - /** */ - private final boolean waitForExchange; - - /** Response count. */ - private volatile int resCnt; - /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -120,339 +63,61 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicAbstractUpdateResponse updateRes ) { - this.cctx = cctx; - this.writeVer = writeVer; - - futVer = cctx.versions().next(updateReq.topologyVersion()); - this.updateReq = updateReq; - this.completionCb = completionCb; - this.updateRes = updateRes; - - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); + super(cctx, updateReq, updateRes, completionCb, writeVer); keys = new ArrayList<>(updateReq.keysCount()); mappings = U.newHashMap(updateReq.keysCount()); - - waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - return futVer.asGridUuid(); - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return futVer; - } - - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - if (log.isDebugEnabled()) - log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); - - return registerResponse(nodeId); } /** - * @param nodeId Node ID. - * @return {@code True} if request found. + * Add key. + * + * @param key Key. */ - private boolean registerResponse(UUID nodeId) { - int resCnt0; - - GridDhtAtomicUpdateRequest req = mappings.get(nodeId); - - if (req != null) { - synchronized (this) { - if (req.onResponse()) { - resCnt0 = resCnt; - - resCnt0 += 1; - - resCnt = resCnt0; - } - else - return false; - } - - if (resCnt0 == mappings.size()) - onDone(); - - return true; - } - - return false; + protected void addKey(KeyCacheObject key) { + keys.add(key); } /** {@inheritDoc} */ - @Override public boolean trackable() { - return true; + @Override protected void markAllKeysFailed(@Nullable Throwable err) { + for (KeyCacheObject key : keys) + updateRes.addFailedKey(key, err); } /** {@inheritDoc} */ - @Override public void markNotTrackable() { - // No-op. + @Override protected void map0() { + for (GridDhtAtomicUpdateRequest req : mappings.values()) + sendRequest(req); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { - if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0) - return this; - - return null; - } - - /** - * @param entry Entry to map. - * @param val Value to write. - * @param entryProcessor Entry processor. - * @param ttl TTL (optional). - * @param conflictExpireTime Conflict expire time (optional). - * @param conflictVer Conflict version (optional). - * @param addPrevVal If {@code true} sends previous value to backups. - * @param prevVal Previous value. - * @param updateCntr Partition update counter. - */ - public void addWriteEntry(GridDhtCacheEntry entry, - @Nullable CacheObject val, - EntryProcessor<Object, Object, Object> entryProcessor, - long ttl, - long conflictExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean addPrevVal, - @Nullable CacheObject prevVal, - long updateCntr) { - AffinityTopologyVersion topVer = updateReq.topologyVersion(); - - Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); - - if (log.isDebugEnabled()) - log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']'); - - CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); - - keys.add(entry.key()); - - for (ClusterNode node : dhtNodes) { - UUID nodeId = node.id(); - - if (!nodeId.equals(cctx.localNodeId())) { - GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); - - if (updateReq == null) { - updateReq = new GridDhtAtomicUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - writeVer, - syncMode, - topVer, - forceTransformBackups, - this.updateReq.subjectId(), - this.updateReq.taskNameHash(), - forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled(), - this.updateReq.keepBinary()); - - mappings.put(nodeId, updateReq); - } - - updateReq.addWriteValue(entry.key(), - val, - entryProcessor, - ttl, - conflictExpireTime, - conflictVer, - addPrevVal, - entry.partition(), - prevVal, - updateCntr); - } - } - } - - /** - * @param readers Entry readers. - * @param entry Entry. - * @param val Value. - * @param entryProcessor Entry processor.. - * @param ttl TTL for near cache update (optional). - * @param expireTime Expire time for near cache update (optional). - */ - public void addNearWriteEntries(Iterable<UUID> readers, - GridDhtCacheEntry entry, - @Nullable CacheObject val, - EntryProcessor<Object, Object, Object> entryProcessor, - long ttl, - long expireTime) { - CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); - - keys.add(entry.key()); - - AffinityTopologyVersion topVer = updateReq.topologyVersion(); - - for (UUID nodeId : readers) { - GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); - - if (updateReq == null) { - ClusterNode node = cctx.discovery().node(nodeId); - - // Node left the grid. - if (node == null) - continue; - - updateReq = new GridDhtAtomicUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - writeVer, - syncMode, - topVer, - forceTransformBackups, - this.updateReq.subjectId(), - this.updateReq.taskNameHash(), - forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled(), - this.updateReq.keepBinary()); - - mappings.put(nodeId, updateReq); - } - - if (nearReadersEntries == null) - nearReadersEntries = new HashMap<>(); - - nearReadersEntries.put(entry.key(), entry); - - updateReq.addNearWriteValue(entry.key(), - val, - entryProcessor, - ttl, - expireTime); - } - } - - /** - * @param clsr Continuous query closure. - */ - public void addContinuousQueryClosure(CI1<Boolean> clsr){ - assert !isDone() : this; - - if (cntQryClsrs == null) - cntQryClsrs = new ArrayList<>(10); - - cntQryClsrs.add(clsr); + @Override protected void mapping(UUID nodeId, GridDhtAtomicUpdateRequest req) { + mappings.put(nodeId, req); } /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { - if (super.onDone(res, err)) { - cctx.mvcc().removeAtomicFuture(version()); - - boolean suc = err == null; - - if (!suc) { - for (KeyCacheObject key : keys) - updateRes.addFailedKey(key, err); - } - - if (cntQryClsrs != null) { - for (CI1<Boolean> clsr : cntQryClsrs) - clsr.apply(suc); - } - - if (updateReq.writeSynchronizationMode() == FULL_SYNC) - completionCb.apply(updateReq, updateRes); - - return true; - } - - return false; + @Override @Nullable protected GridDhtAtomicUpdateRequest mapping(UUID nodeId) { + return mappings.get(nodeId); } - /** - * Sends requests to remote nodes. - */ - public void map() { - if (!mappings.isEmpty()) { - for (GridDhtAtomicUpdateRequest req : mappings.values()) { - try { - if (log.isDebugEnabled()) - log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send update request to backup node because it left grid: " + - req.nodeId()); - - registerResponse(req.nodeId()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send update request to backup node (did node leave the grid?): " - + req.nodeId(), e); - - registerResponse(req.nodeId()); - } - } - } - else - onDone(); - - // Send response right away if no ACKs from backup is required. - // Backups will send ACKs anyway, future will be completed after all backups have replied. - if (updateReq.writeSynchronizationMode() != FULL_SYNC) - completionCb.apply(updateReq, updateRes); + /** {@inheritDoc} */ + @Override protected int mappingsCount() { + return mappings != null ? mappings.size() : 0; } - /** - * Callback for backup update response. - * - * @param nodeId Backup node ID. - * @param updateRes Update response. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) { - if (log.isDebugEnabled()) - log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']'); - - if (updateRes.error() != null) { - List<KeyCacheObject> failed = new ArrayList<>(updateRes.failedCount()); - - for (int i = 0; i < updateRes.failedCount(); i++) - failed.add(updateRes.failed(i)); - - this.updateRes.addFailedKeys(failed, updateRes.error()); - } - - for (int i = 0; i < updateRes.nearEvictedCount(); i++) { - KeyCacheObject key = updateRes.nearEvicted(i); - - GridDhtCacheEntry entry = nearReadersEntries.get(key); - - try { - entry.removeReader(nodeId, updateRes.messageId()); - } - catch (GridCacheEntryRemovedException e) { - if (log.isDebugEnabled()) - log.debug("Entry with evicted reader was removed [entry=" + entry + ", err=" + e + ']'); - } - } + /** {@inheritDoc} */ + @Override protected void nearReaderEntry(KeyCacheObject key, GridDhtCacheEntry entry) { + if (nearReadersEntries == null) + nearReadersEntries = new HashMap<>(); - registerResponse(nodeId); + nearReadersEntries.put(entry.key(), entry); } - /** - * Deferred update response. - * - * @param nodeId Backup node ID. - */ - public void onResult(UUID nodeId) { - if (log.isDebugEnabled()) - log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']'); + /** {@inheritDoc} */ + @Override protected GridDhtCacheEntry nearReaderEntry(KeyCacheObject key) { + assert nearReadersEntries != null; - registerResponse(nodeId); + return nearReadersEntries.get(key); } /** {@inheritDoc} */
