ignite-1.5 Fixed GridNearAtomicUpdateFuture to do not complete future before near cache is updated
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30f3c341 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30f3c341 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30f3c341 Branch: refs/heads/ignite-1537 Commit: 30f3c341d9f957c9b96887b27efaf19e3d914afb Parents: 6c5b8a1 Author: sboikov <[email protected]> Authored: Mon Dec 14 15:21:01 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 14 15:21:01 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 10 +++--- .../dht/atomic/GridNearAtomicUpdateFuture.java | 34 ++++++++++++++++---- .../dht/atomic/GridNearAtomicUpdateRequest.java | 25 ++++++++++++++ ...omicMultiNodeP2PDisabledFullApiSelfTest.java | 5 --- ...ledFairAffinityMultiNodeFullApiSelfTest.java | 5 --- 5 files changed, 59 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30f3c341/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 1f4cb6a..7bee5a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -151,11 +151,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectTransient private boolean onRes; + /** */ @GridDirectTransient private List<Integer> partIds; + /** */ @GridDirectTransient - private List<CacheObject> localPrevVals; + private List<CacheObject> locPrevVals; /** Keep binary flag. */ private boolean keepBinary; @@ -213,7 +215,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid keys = new ArrayList<>(); partIds = new ArrayList<>(); - localPrevVals = new ArrayList<>(); + locPrevVals = new ArrayList<>(); if (forceTransformBackups) { entryProcessors = new ArrayList<>(); @@ -254,7 +256,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid partIds.add(partId); - localPrevVals.add(prevVal); + locPrevVals.add(prevVal); if (forceTransformBackups) { assert entryProcessor != null; @@ -519,7 +521,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @return Value. */ @Nullable public CacheObject localPreviousValue(int idx) { - return localPrevVals.get(idx); + return locPrevVals.get(idx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/30f3c341/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ba3d546..922a4b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -167,6 +167,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param skipStore Skip store flag. + * @param keepBinary Keep binary flag. * @param remapCnt Maximum number of retries. * @param waitTopFut If {@code false} does not wait for affinity change future. */ @@ -359,7 +360,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param res Update response. */ private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - if (!nearEnabled || !req.hasPrimary()) + assert nearEnabled; + + if (res.remapKeys() != null || !req.hasPrimary()) return; GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); @@ -544,6 +547,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @GridToStringInclude private Map<UUID, GridNearAtomicUpdateRequest> mappings; + /** */ + private int resCnt; + /** Error. */ private CachePartialUpdateCheckedException err; @@ -632,10 +638,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> rcvAll = true; } else { - req = mappings != null ? mappings.remove(nodeId) : null; + req = mappings != null ? mappings.get(nodeId) : null; - if (req != null) - rcvAll = mappings.isEmpty(); + if (req != null && req.onResponse(res)) { + resCnt++; + + rcvAll = mappings.size() == resCnt; + } else return; } @@ -731,8 +740,19 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } - if (!nodeErr && res.remapKeys() == null) - updateNear(req, res); + if (rcvAll && nearEnabled) { + if (mappings != null) { + for (GridNearAtomicUpdateRequest req0 : mappings.values()) { + GridNearAtomicUpdateResponse res0 = req0.response(); + + assert res0 != null : req0; + + updateNear(req0, res0); + } + } + else + updateNear(req, res); + } if (remapTopVer != null) { if (fut0 != null) @@ -871,6 +891,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> mappings = new HashMap<>(pendingMappings); assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; + + resCnt = 0; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/30f3c341/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index c24ad34..7c0aba5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -154,6 +154,10 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Keep binary flag. */ private boolean keepBinary; + /** */ + @GridDirectTransient + private GridNearAtomicUpdateResponse res; + /** * Empty constructor required by {@link Externalizable}. */ @@ -544,6 +548,27 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri return hasPrimary; } + /** + * @param res Response. + * @return {@code True} if current response was {@code null}. + */ + public boolean onResponse(GridNearAtomicUpdateResponse res) { + if (this.res == null) { + this.res = res; + + return true; + } + + return false; + } + + /** + * @return Response. + */ + @Nullable public GridNearAtomicUpdateResponse response() { + return res; + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/30f3c341/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java index c468cc2..d4efff3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java @@ -65,9 +65,4 @@ public class GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest return ccfg; } - - /** {@inheritDoc} */ - @Override public void testWithSkipStore() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1809"); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/30f3c341/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java index e4784f2..64943e6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java @@ -33,9 +33,4 @@ public class GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest return cfg; } - - /** {@inheritDoc} */ - @Override public void testWithSkipStore(){ - fail("https://issues.apache.org/jira/browse/IGNITE-1582"); - } }
