Repository: ignite Updated Branches: refs/heads/ignite-2926 4b78262c6 -> 9d862653a
Fixed remap-keys problem. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d862653 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d862653 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d862653 Branch: refs/heads/ignite-2926 Commit: 9d862653aa7a5c1844f0b783637634fb239053af Parents: 4b78262 Author: vozerov-gridgain <[email protected]> Authored: Fri Apr 15 17:22:59 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Apr 15 17:22:59 2016 +0300 ---------------------------------------------------------------------- .../GridNearAtomicSingleUpdateFuture.java | 63 ++++++++++++++++---- 1 file changed, 51 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9d862653/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 3917936..6547b0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import javax.cache.expiry.ExpiryPolicy; @@ -73,6 +74,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** Not null is operation is mapped to single node. */ private GridNearAtomicUpdateRequest req; + /** Keys to remap. */ + private Collection<KeyCacheObject> remapKeys; + /** * @param cctx Cache context. * @param cache Cache instance. @@ -222,6 +226,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda this.req = null; if (res.remapKeys() != null) { + if (remapKeys == null) + remapKeys = U.newHashSet(res.remapKeys().size()); + + remapKeys.addAll(res.remapKeys()); + if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) mapErrTopVer = req.topologyVersion(); } @@ -260,7 +269,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } } - if (res.remapKeys() != null) { + if (remapKeys != null) { assert mapErrTopVer != null; remapTopVer = cctx.shared().exchange().topologyVersion(); @@ -285,6 +294,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda err = null; + Collection<Object> failedKeys = cause.failedKeys(); + + remapKeys = new ArrayList<>(failedKeys.size()); + + for (Object key : failedKeys) + remapKeys.add(cctx.toCacheKeyObject(key)); + updVer = null; } } @@ -312,14 +328,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return; } - if (nearEnabled && !nodeErr) { - if (res.remapKeys() != null || !req.hasPrimary()) - return; - - GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); - - near.processNearAtomicUpdateResponse(req, res); - } + if (nearEnabled && !nodeErr) + updateNear(req, res); if (remapTopVer != null) { if (fut0 != null) @@ -332,6 +342,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } if (topLocked) { + assert !F.isEmpty(remapKeys) : remapKeys; + CachePartialUpdateCheckedException e = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); @@ -340,7 +352,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); - e.add(Collections.singleton(key), cause); + e.add(remapKeys, cause); onDone(e); @@ -360,7 +372,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda try { AffinityTopologyVersion topVer = fut.get(); - map(topVer); + map(topVer, remapKeys); } catch (IgniteCheckedException e) { onDone(e); @@ -376,6 +388,23 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda onDone(opRes0, err0); } + /** + * Updates near cache. + * + * @param req Update request. + * @param res Update response. + */ + private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + assert nearEnabled; + + if (res.remapKeys() != null || !req.hasPrimary()) + return; + + GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); + + near.processNearAtomicUpdateResponse(req, res); + } + /** {@inheritDoc} */ @Override protected void mapOnTopology() { cache.topology().readLock(); @@ -427,7 +456,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda cache.topology().readUnlock(); } - map(topVer); + map(topVer, null); } /** @@ -480,6 +509,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** {@inheritDoc} */ protected void map(AffinityTopologyVersion topVer) { + map(topVer, null); + } + + /** + * @param topVer Topology version. + * @param remapKeys Keys to remap. + */ + void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -524,6 +561,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda resCnt = 0; req = singleReq0; + + this.remapKeys = null; } } catch (Exception e) {
