Repository: ignite Updated Branches: refs/heads/ignite-4680-sb d43d73d32 -> 883509030
tmp Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88350903 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88350903 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88350903 Branch: refs/heads/ignite-4680-sb Commit: 883509030fddf182df934698126a86f7008f7096 Parents: d43d73d Author: sboikov <[email protected]> Authored: Mon Mar 20 12:27:17 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Mar 20 12:27:17 2017 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 26 +++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/88350903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index f2a251d..923b895 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1794,20 +1794,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Do not check topology version if topology was locked on near node by // external transaction or explicit lock. if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) { + final GridCacheVersion ver = ctx.versions().next(ctx.topology().topologyVersion()); + Map<Integer, int[]> stripemap = req.stripeMap(); - final GridDhtAtomicAbstractUpdateFuture fut = createDhtFuture(null, req, req.size()); + final GridDhtAtomicAbstractUpdateFuture fut = null;//createDhtFuture(null, req, req.size()); + + final AffinityAssignment affAssignment = ctx.affinity().assignment(req.topologyVersion()); ((GridNearAtomicFullUpdateRequest)req).responseHelper(new NearAtomicResponseHelper(stripemap.size())); for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) { if (stripeIdx == e.getKey()) - update(fut, node, req, e.getValue(), completionCb); + update(affAssignment, ver, fut, node, req, e.getValue(), completionCb); else { ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() { @Override public void run() { try { - update(fut, node, req, e.getValue(), completionCb); + update(affAssignment, ver, fut, node, req, e.getValue(), completionCb); } catch (Exception e) { e.printStackTrace(); @@ -1890,11 +1894,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } private void update( + AffinityAssignment affinityAssignment, + GridCacheVersion ver, GridDhtAtomicAbstractUpdateFuture fut, ClusterNode node, GridNearAtomicAbstractUpdateRequest req, int[] stripeIdxs, UpdateReplyClosure completionCb) throws GridCacheEntryRemovedException { + fut = createDhtFuture(null, req, req.size()); + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), node.id(), req.futureId(), @@ -1908,7 +1916,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Assign next version for update inside entries lock. //if (ver == null) - GridCacheVersion ver = ctx.versions().next(ctx.topology().topologyVersion()); if (hasNear) res.nearVersion(ver); @@ -1926,13 +1933,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; - UpdateSingleResult updRes = updateSingle(node, + UpdateSingleResult updRes = updateSingle( + affinityAssignment, + node, hasNear, req, res, locked, ver, - null, + fut, ctx.isDrEnabled(), null, null, @@ -1951,7 +1960,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res); if (res0 != null) { - fut.onDone(); + //fut.onDone(); completionCb.apply(req, res); } @@ -2420,6 +2429,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @throws GridCacheEntryRemovedException Should be never thrown. */ private UpdateSingleResult updateSingle( + AffinityAssignment affAssignment, ClusterNode nearNode, boolean hasNear, GridNearAtomicAbstractUpdateRequest req, @@ -2442,8 +2452,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - AffinityAssignment affAssignment = ctx.affinity().assignment(topVer); - int keyNum = stripeIdxs == null ? req.size() : stripeIdxs.length; // Avoid iterator creation.
