Repository: ignite Updated Branches: refs/heads/ignite-comm-balance-master 983c9bd48 -> f28fa9ac5
reimplemented atomic single update to bypass primary on dht updates and send directly to client Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f28fa9ac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f28fa9ac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f28fa9ac Branch: refs/heads/ignite-comm-balance-master Commit: f28fa9ac5b08ad40544cb4e7947c9b2a1ef6a087 Parents: 983c9bd Author: Yakov Zhdanov <[email protected]> Authored: Wed Jan 18 20:20:04 2017 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Wed Jan 18 20:20:04 2017 +0300 ---------------------------------------------------------------------- .../GridDhtAtomicAbstractUpdateFuture.java | 14 ++++- .../GridDhtAtomicAbstractUpdateRequest.java | 8 +++ .../dht/atomic/GridDhtAtomicCache.java | 22 +++++-- .../atomic/GridDhtAtomicSingleUpdateFuture.java | 6 +- .../GridDhtAtomicSingleUpdateRequest.java | 61 +++++++++++++----- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 6 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 66 +++++++++++++------- .../GridNearAtomicAbstractUpdateFuture.java | 2 + .../GridNearAtomicSingleUpdateFuture.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 4 +- 10 files changed, 139 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 361fbe2..3ee45ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; @@ -99,6 +100,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** Response count. */ private volatile int resCnt; + public UUID nearNodeId; + /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -111,10 +114,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, - GridNearAtomicUpdateResponse updateRes) { + GridNearAtomicUpdateResponse updateRes + ) { this.cctx = cctx; - futVer = cctx.versions().next(updateReq.topologyVersion()); + this.futVer = CU.cheatCache(cctx.cacheId()) ? updateReq.futureVersion() : + cctx.versions().next(updateReq.topologyVersion()); + this.updateReq = updateReq; this.completionCb = completionCb; this.updateRes = updateRes; @@ -199,6 +205,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte conflictExpireTime, conflictVer); + updateReq.nearNodeId(nearNodeId); + mappings.put(nodeId, updateReq); } @@ -450,7 +458,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte clsr.apply(suc); } - if (updateReq.writeSynchronizationMode() == FULL_SYNC) + if (updateReq.writeSynchronizationMode() == FULL_SYNC && !CU.cheatCache(cctx.cacheId())) completionCb.apply(updateReq, updateRes); return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index deb9ce4..21d5c87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -76,6 +76,14 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag return nodeId; } + public UUID nearNodeId() { + return null; + } + + public void nearNodeId(UUID nearNodeId) { + + } + /** * @return Keep binary flag. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d52421e..ba0ea89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -680,9 +680,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, false).get(); - assert res != null; + assert res != null || CU.cheatCache(ctx.cacheId()); - return res; + return CU.cheatCache(ctx.cacheId()) ? true : res; } /** {@inheritDoc} */ @@ -1905,7 +1905,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (req.writeSynchronizationMode() != FULL_ASYNC) req.cleanup(!node.isLocal()); - if (dhtFut != null) + if (dhtFut != null && !CU.cheatCache(ctx.cacheId())) ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut); } else @@ -2517,6 +2517,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (dhtFut != null) { if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. + dhtFut.nearNodeId = node.id(); //TODO + GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); if (conflictCtx == null) @@ -3195,7 +3197,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @SuppressWarnings("unchecked") private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { if (msgLog.isDebugEnabled()) - msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + ", node=" + nodeId + ']'); + msgLog.debug("Received near atomic update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']'); res.nodeId(ctx.localNodeId()); @@ -3319,7 +3321,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) { - ctx.io().send(nodeId, res, ctx.ioPolicy()); + if (CU.cheatCache(ctx.cacheId())) { + ctx.io().send( + req.nearNodeId(), + new GridNearAtomicUpdateResponse(ctx.cacheId(), + req.nearNodeId(), + req.futureVersion(), + false), + ctx.ioPolicy()); + } + else + ctx.io().send(nodeId, res, ctx.ioPolicy()); if (msgLog.isDebugEnabled()) { msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() + http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index 20d6e90..0dc2754 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -67,7 +67,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes ) { - super(cctx, completionCb, writeVer, updateReq, updateRes); + super(cctx, + completionCb, + writeVer, + updateReq, + updateRes); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java index a7e6c24..53c3079 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -52,6 +52,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat private static final int NEAR_FLAG_MASK = 0x80; /** Future version. */ + @GridToStringInclude protected GridCacheVersion futVer; /** Write version. */ @@ -88,8 +89,12 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat protected long updateCntr; /** */ + @GridToStringInclude protected int partId; + @GridToStringInclude + protected UUID nearNodeId; + /** * Empty constructor required by {@link Externalizable}. */ @@ -201,6 +206,14 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat this.val = val; } + public UUID nearNodeId() { + return nearNodeId; + } + + public void nearNodeId(UUID nearNodeId) { + this.nearNodeId = nearNodeId; + } + /** {@inheritDoc} */ @Override public boolean forceTransformBackups() { return false; @@ -441,54 +454,60 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat writer.incrementState(); case 6: - if (!writer.writeInt("partId", partId)) + if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); case 7: - if (!writer.writeMessage("prevVal", prevVal)) + if (!writer.writeInt("partId", partId)) return false; writer.incrementState(); case 8: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeMessage("prevVal", prevVal)) return false; writer.incrementState(); case 9: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 10: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 11: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 12: - if (!writer.writeLong("updateCntr", updateCntr)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 13: - if (!writer.writeMessage("val", val)) + if (!writer.writeLong("updateCntr", updateCntr)) return false; writer.incrementState(); case 14: + if (!writer.writeMessage("val", val)) + return false; + + writer.incrementState(); + + case 15: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -535,7 +554,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); case 6: - partId = reader.readInt("partId"); + nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) return false; @@ -543,7 +562,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); case 7: - prevVal = reader.readMessage("prevVal"); + partId = reader.readInt("partId"); if (!reader.isLastRead()) return false; @@ -551,7 +570,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); case 8: - subjId = reader.readUuid("subjId"); + prevVal = reader.readMessage("prevVal"); if (!reader.isLastRead()) return false; @@ -559,6 +578,14 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); case 9: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -570,7 +597,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 10: + case 11: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -578,7 +605,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 11: + case 12: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -586,7 +613,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 12: + case 13: updateCntr = reader.readLong("updateCntr"); if (!reader.isLastRead()) @@ -594,7 +621,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 13: + case 14: val = reader.readMessage("val"); if (!reader.isLastRead()) @@ -602,7 +629,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 14: + case 15: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -652,7 +679,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; + return 16; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index efb35c4..5429adc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -66,7 +66,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes ) { - super(cctx, completionCb, writeVer, updateReq, updateRes); + super(cctx, + completionCb, + writeVer, + updateReq, + updateRes); keys = new ArrayList<>(updateReq.size()); mappings = U.newHashMap(updateReq.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 7144963..c911a99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -151,6 +151,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** Additional flags. */ private byte flags; + protected UUID nearNodeId; + /** * Empty constructor required by {@link Externalizable}. */ @@ -292,6 +294,14 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque conflictExpireTimes.add(conflictExpireTime); } + public UUID nearNodeId() { + return nearNodeId; + } + + public void nearNodeId(UUID nearNodeId) { + this.nearNodeId = nearNodeId; + } + /** {@inheritDoc} */ @Override public void addNearWriteValue(KeyCacheObject key, @Nullable CacheObject val, @@ -681,66 +691,72 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque writer.incrementState(); case 15: - if (!writer.writeMessage("nearTtls", nearTtls)) + if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); case 16: - if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearTtls", nearTtls)) return false; writer.incrementState(); case 17: - if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 18: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 19: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 20: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("ttls", ttls)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 23: - if (!writer.writeMessage("updateCntrs", updateCntrs)) + if (!writer.writeMessage("ttls", ttls)) return false; writer.incrementState(); case 24: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("updateCntrs", updateCntrs)) return false; writer.incrementState(); case 25: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 26: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -859,7 +875,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); case 15: - nearTtls = reader.readMessage("nearTtls"); + nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) return false; @@ -867,7 +883,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); case 16: - nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); + nearTtls = reader.readMessage("nearTtls"); if (!reader.isLastRead()) return false; @@ -875,7 +891,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); case 17: - prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); + nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -883,7 +899,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); case 18: - subjId = reader.readUuid("subjId"); + prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -891,6 +907,14 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); case 19: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -902,7 +926,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 20: + case 21: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -910,7 +934,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 21: + case 22: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -918,7 +942,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 22: + case 23: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -926,7 +950,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 23: + case 24: updateCntrs = reader.readMessage("updateCntrs"); if (!reader.isLastRead()) @@ -934,7 +958,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 24: + case 25: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -942,7 +966,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 25: + case 26: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -968,7 +992,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 26; + return 27; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index a77facc..862e920 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -135,6 +136,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt protected GridCacheReturn opRes; /** */ + @GridToStringInclude protected volatile Runnable completer; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 06076be..da9cb40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -210,7 +210,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (!res.futureVersion().equals(futVer)) return; - if (!this.req.nodeId().equals(nodeId)) + if (!this.req.nodeId().equals(nodeId) && !CU.cheatCache(cctx.cacheId())) return; req = this.req; http://git-wip-us.apache.org/repos/asf/ignite/blob/f28fa9ac/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 9601ab1..675e631 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1128,8 +1128,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter Boolean clientMode = ignite.configuration().isClientMode(); usePairedConnections = !clientMode; - - connectionsPerNode = clientMode ? 1 : 2; } } } @@ -3328,7 +3326,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return {@code True} if given node supports multiple connections per-node for communication. */ private boolean useMultipleConnections(ClusterNode node) { - return node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0 && !node.isClient(); + return node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0; } /**
