Repository: ignite Updated Branches: refs/heads/ignite-4705 1b5dcceab -> f81e774b9
ignite-4705 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11e8776a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11e8776a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11e8776a Branch: refs/heads/ignite-4705 Commit: 11e8776a770d8ce5be0208fe82e65362760ed4cf Parents: ba08585 Author: sboikov <[email protected]> Authored: Tue Feb 21 18:05:52 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Feb 21 18:05:52 2017 +0300 ---------------------------------------------------------------------- .../GridDhtAtomicAbstractUpdateFuture.java | 77 ++++++++++++++++---- .../GridDhtAtomicAbstractUpdateRequest.java | 3 + .../dht/atomic/GridDhtAtomicCache.java | 21 ++---- .../dht/atomic/GridDhtAtomicNearResponse.java | 68 ++++++++++++++--- .../atomic/GridDhtAtomicSingleUpdateFuture.java | 3 - .../dht/atomic/GridDhtAtomicUpdateFuture.java | 4 - .../GridNearAtomicAbstractUpdateFuture.java | 7 ++ 7 files changed, 134 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 96bfcb4..79fb7fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -40,17 +40,14 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; @@ -80,12 +77,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** Future version. */ protected final long futId; - /** Completion callback. */ - @GridToStringExclude - private final GridDhtAtomicCache.UpdateReplyClosure completionCb; - /** Update request. */ - protected final GridNearAtomicAbstractUpdateRequest updateReq; + final GridNearAtomicAbstractUpdateRequest updateReq; /** Update response. */ final GridNearAtomicUpdateResponse updateRes; @@ -100,16 +93,17 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** Response count. */ private volatile int resCnt; + /** */ + private boolean repliedToNear; + /** * @param cctx Cache context. - * @param completionCb Callback to invoke when future is completed. * @param writeVer Write version. * @param updateReq Update request. * @param updateRes Update response. */ protected GridDhtAtomicAbstractUpdateFuture( GridCacheContext cctx, - GridDhtAtomicCache.UpdateReplyClosure completionCb, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes @@ -117,7 +111,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte this.cctx = cctx; this.updateReq = updateReq; - this.completionCb = completionCb; this.updateRes = updateRes; this.writeVer = writeVer; @@ -235,6 +228,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte protected abstract void addNearKey(KeyCacheObject key, Collection<UUID> readers); /** + * @param nearNodeId Near node ID. * @param readers Entry readers. * @param entry Entry. * @param val Value. @@ -336,9 +330,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null; + boolean needReplyToNear = false; + if (req != null) { synchronized (this) { if (req.onResponse()) { + if (nodeErr && !repliedToNear) + needReplyToNear = repliedToNear = true; + resCnt0 = resCnt; resCnt0 += 1; @@ -349,6 +348,51 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte return false; } + if (needReplyToNear) { + assert !F.isEmpty(mappings); + + List<UUID> dhtNodes = new ArrayList<>(mappings.size()); + + dhtNodes.addAll(mappings.keySet()); + + GridDhtAtomicNearResponse res = new GridDhtAtomicNearResponse(cctx.cacheId(), + req.partition(), + req.futureId(), + cctx.localNodeId(), + dhtNodes, + req.flags()); + + res.failedNodeId(nodeId); + + try { + cctx.io().send(req.nearNodeId(), res, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, sent response on DHT node fail " + + "[futId=" + futId + + ", writeVer=" + writeVer + + ", node=" + req.nearNodeId() + + ", failedNode=" + nodeId + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, failed to notify near node on DHT node fail, near node left " + + "[futId=" + futId + + ", writeVer=" + writeVer + + ", node=" + req.nearNodeId() + + ", failedNode=" + nodeId + ']'); + } + } + catch (IgniteCheckedException ignored) { + U.error(msgLog, "DTH update fut, failed to notify near node on DHT node fail " + + "[futId=" + futId + + ", writeVer=" + writeVer + + ", node=" + req.nearNodeId() + + ", failedNode=" + nodeId + ']'); + } + } + if (resCnt0 == mappings.size()) onDone(); @@ -361,11 +405,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** * Sends requests to remote nodes. * + * @param completionCb Callback to invoke to send response to near node. * @param ret Cache operation return value. */ - final void map(GridCacheReturn ret) { + final void map(GridDhtAtomicCache.UpdateReplyClosure completionCb, GridCacheReturn ret) { boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC; - boolean primaryReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC || ret.hasValue(); + repliedToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC || ret.hasValue(); List<UUID> dhtNodes = null; @@ -378,14 +423,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte else dhtNodes = Collections.emptyList(); - if (primaryReplyToNear) + if (repliedToNear) updateRes.mapping(dhtNodes); } if (!F.isEmpty(mappings)) { - sendDhtRequests(fullSync && !primaryReplyToNear, dhtNodes, ret); + sendDhtRequests(fullSync && !repliedToNear, dhtNodes, ret); - if (primaryReplyToNear) + if (repliedToNear) completionCb.apply(updateReq, updateRes); else { if (fullSync && GridDhtAtomicCache.IGNITE_ATOMIC_SND_MAPPING_TO_NEAR) { http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index c991e66..1841a49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -56,6 +56,9 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag /** */ public static final int DHT_ATOMIC_RESULT_SUCCESS_MASK = 0x10; + /** */ + public static final int DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE = 0x20; + /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 30d58cb..0557bc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1825,7 +1825,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); - dhtFut = createDhtFuture(ver, req, res, completionCb, false); + dhtFut = createDhtFuture(ver, req, res, false); expiry = expiryPolicy(req.expiry()); @@ -1865,7 +1865,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { locked, ver, dhtFut, - completionCb, ctx.isDrEnabled(), taskName, expiry, @@ -1947,7 +1946,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { else { // If there are backups, map backup update future. if (dhtFut != null) { - dhtFut.map(res.returnValue()); + dhtFut.map(completionCb, res.returnValue()); // Otherwise, complete the call. } else @@ -2135,7 +2134,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, entryProcessorMap, dhtFut, - completionCb, req, res, replicate, @@ -2184,7 +2182,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { rmvKeys, entryProcessorMap, dhtFut, - completionCb, req, res, replicate, @@ -2311,7 +2308,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { rmvKeys, entryProcessorMap, dhtFut, - completionCb, req, res, replicate, @@ -2391,7 +2387,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param locked Locked entries. * @param ver Assigned update version. * @param dhtFut Optional DHT future. - * @param completionCb Completion callback to invoke when DHT future is completed. * @param replicate Whether DR is enabled for that cache. * @param taskName Task name. * @param expiry Expiry policy. @@ -2407,7 +2402,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { List<GridDhtCacheEntry> locked, GridCacheVersion ver, @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, - GridDhtAtomicCache.UpdateReplyClosure completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, @@ -2485,7 +2479,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtFut); if (dhtFut == null && !F.isEmpty(filteredReaders)) { - dhtFut = createDhtFuture(ver, req, res, completionCb, true); + dhtFut = createDhtFuture(ver, req, res, true); readersOnly = true; } @@ -2638,7 +2632,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final Collection<KeyCacheObject> rmvKeys, @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, - final GridDhtAtomicCache.UpdateReplyClosure completionCb, final GridNearAtomicAbstractUpdateRequest req, final GridNearAtomicUpdateResponse res, final boolean replicate, @@ -2792,7 +2785,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { batchRes.addDeleted(entry, updRes, entries); if (dhtFut == null && !F.isEmpty(filteredReaders)) { - dhtFut = createDhtFuture(ver, req, res, completionCb, true); + dhtFut = createDhtFuture(ver, req, res, true); batchRes.readersOnly(true); } @@ -3112,7 +3105,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param writeVer Write version. * @param updateReq Update request. * @param updateRes Update response. - * @param completionCb Completion callback to invoke when future is completed. * @param force If {@code true} then creates future without optimizations checks. * @return Backup update future or {@code null} if there are no backups. */ @@ -3120,13 +3112,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes, - GridDhtAtomicCache.UpdateReplyClosure completionCb, boolean force ) { if (updateReq.size() == 1) - return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); + return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq, updateRes); else - return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); + return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, updateRes); // if (!force) { // if (updateReq.fastMap()) // return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java index 244c22c..af9e908 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java @@ -32,6 +32,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK; +import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE; import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_RESULT_SUCCESS_MASK; /** @@ -63,6 +64,9 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { /** */ private UpdateErrors errors; + /** */ + private UUID failedNodeId; + /** * */ @@ -78,7 +82,13 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { * @param mapping Update mapping. * @param flags Flags. */ - public GridDhtAtomicNearResponse(int cacheId, int partId, long futId, UUID primaryId, List<UUID> mapping, byte flags) { + public GridDhtAtomicNearResponse(int cacheId, + int partId, + long futId, + UUID primaryId, + List<UUID> mapping, + byte flags) + { this.cacheId = cacheId; this.partId = partId; this.futId = futId; @@ -88,9 +98,31 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { } /** + * @return Failed node ID. + */ + UUID failedNodeId() { + return failedNodeId; + } + + /** + * @param failedNodeId Failed node ID (used when primary notifies near node). + */ + void failedNodeId(UUID failedNodeId) { + assert failedNodeId != null; + + this.failedNodeId = failedNodeId; + + setFlag(true, DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE); + } + + boolean primaryDhtFailureResponse() { + return isFlag(DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE); + } + + /** * @return Primary node ID. */ - public UUID primaryId() { + UUID primaryId() { return primaryId; } @@ -170,7 +202,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; } /** {@inheritDoc} */ @@ -216,30 +248,36 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { writer.incrementState(); case 4: - if (!writer.writeByte("flags", flags)) + if (!writer.writeUuid("failedNodeId", failedNodeId)) return false; writer.incrementState(); case 5: - if (!writer.writeLong("futId", futId)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 6: - if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID)) + if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); case 7: - if (!writer.writeInt("partId", partId)) + if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID)) return false; writer.incrementState(); case 8: + if (!writer.writeInt("partId", partId)) + return false; + + writer.incrementState(); + + case 9: if (!writer.writeUuid("primaryId", primaryId)) return false; @@ -270,7 +308,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { reader.incrementState(); case 4: - flags = reader.readByte("flags"); + failedNodeId = reader.readUuid("failedNodeId"); if (!reader.isLastRead()) return false; @@ -278,7 +316,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { reader.incrementState(); case 5: - futId = reader.readLong("futId"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -286,7 +324,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { reader.incrementState(); case 6: - mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID); + futId = reader.readLong("futId"); if (!reader.isLastRead()) return false; @@ -294,7 +332,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { reader.incrementState(); case 7: - partId = reader.readInt("partId"); + mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID); if (!reader.isLastRead()) return false; @@ -302,6 +340,14 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { reader.incrementState(); case 8: + partId = reader.readInt("partId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: primaryId = reader.readUuid("primaryId"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index c6b3f5b..b431bd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -50,20 +50,17 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture /** * @param cctx Cache context. - * @param completionCb Callback to invoke when future is completed. * @param writeVer Write version. * @param updateReq Update request. * @param updateRes Update response. */ GridDhtAtomicSingleUpdateFuture( GridCacheContext cctx, - GridDhtAtomicCache.UpdateReplyClosure completionCb, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes ) { super(cctx, - completionCb, writeVer, updateReq, updateRes); http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 673fb5d..7303736 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -50,23 +50,19 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { /** Entries with readers. */ private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries; - /** * @param cctx Cache context. - * @param completionCb Callback to invoke when future is completed. * @param writeVer Write version. * @param updateReq Update request. * @param updateRes Update response. */ GridDhtAtomicUpdateFuture( GridCacheContext cctx, - GridDhtAtomicCache.UpdateReplyClosure completionCb, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes ) { super(cctx, - completionCb, writeVer, updateReq, updateRes); http://git-wip-us.apache.org/repos/asf/ignite/blob/11e8776a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 66d392b..258a0bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -442,6 +442,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * @return {@code True} if request processing finished. */ boolean onDhtResponse(GridCacheContext cctx, UUID nodeId, GridDhtAtomicNearResponse res) { + if (res.primaryDhtFailureResponse()) { + assert res.mapping() != null : res; + assert res.failedNodeId() != null : res; + + nodeId = res.failedNodeId(); + } + if (res.mapping() != null) { // Mapping is sent from dht nodes. if (mapping == null)
