ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28d4779b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28d4779b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28d4779b Branch: refs/heads/ignite-4705-debug Commit: 28d4779b318a688a9d36371c7d1d5ab7b254b5d5 Parents: 2b38fd9 Author: sboikov <[email protected]> Authored: Tue Mar 7 17:07:47 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Mar 7 17:36:15 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 8 ++ .../dht/atomic/GridDhtAtomicCache.java | 14 ++- .../GridNearAtomicAbstractUpdateFuture.java | 41 +++++-- .../GridNearAtomicAbstractUpdateRequest.java | 8 ++ .../GridNearAtomicSingleUpdateFuture.java | 4 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +- .../atomic/GridNearAtomicUpdateResponse.java | 24 ++-- .../atomic/IgniteCacheAtomicProtocolTest.java | 116 +++++++++++++++++++ 8 files changed, 196 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 50820ae..a1b94a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -590,6 +590,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ctx.cacheId(), nodeId, req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.error(req.classError()); @@ -768,6 +770,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ctx.cacheId(), nodeId, req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.error(req.classError()); @@ -784,6 +788,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ctx.cacheId(), nodeId, req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.error(req.classError()); @@ -800,6 +806,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ctx.cacheId(), nodeId, req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.error(req.classError()); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d6fdd10..f216a3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1708,6 +1708,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.addFailedKeys(req.keys(), e); @@ -1736,11 +1738,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureId(), + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + nodeId, + req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); - res.partition(req.partition()); - assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1); GridDhtAtomicAbstractUpdateFuture dhtFut = null; @@ -3083,10 +3087,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, checkReq.futureId(), + checkReq.partition(), + false, false); - res.partition(checkReq.partition()); - GridCacheReturn ret = new GridCacheReturn(false, true); res.returnValue(ret); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 6e98502..8d97732 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -358,6 +358,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), req.nodeId(), req.futureId(), + req.partition(), + true, cctx.deploymentEnabled()); ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + @@ -378,6 +380,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), req.nodeId(), req.futureId(), + req.partition(), + e instanceof ClusterTopologyCheckedException, cctx.deploymentEnabled()); res.addFailedKeys(req.keys(), e); @@ -387,9 +391,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt final void onSendError(GridNearAtomicCheckUpdateRequest req, IgniteCheckedException e) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - req.updateRequest().nodeId(), - req.futureId(), - cctx.deploymentEnabled()); + req.updateRequest().nodeId(), + req.futureId(), + req.partition(), + e instanceof ClusterTopologyCheckedException, + cctx.deploymentEnabled()); res.addFailedKeys(req.updateRequest().keys(), e); @@ -497,17 +503,35 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt } /** - * @param nodeId Node ID. - * @return Request if need process primary response, {@code null} otherwise. + * @return Request if need process primary fail response, {@code null} otherwise. */ - @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId) { + @Nullable GridNearAtomicAbstractUpdateRequest onPrimaryFail() { if (finished()) return null; - if (req != null && req.nodeId().equals(nodeId) && req.response() == null) + if (req.fullSync() && !req.nodeFailedResponse()) { + req.resetResponse(); + return req; + } + + return req.response() == null ? req : null; + } + + /** + * @param nodeId Node ID. + * @return Request if need process primary response, {@code null} otherwise. + */ + @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { + assert req.nodeId().equals(nodeId); + + if (res.nodeLeftResponse()) + return onPrimaryFail(); + + if (finished()) + return null; - return null; + return req.response() == null ? req : null; } /** @@ -558,6 +582,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt /** * @param res Response. + * @param cctx Cache context. * @return {@code True} if request processing finished. */ boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) { http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index 4f288b1..23301c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -265,6 +265,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa return false; } + void resetResponse() { + this.res = null; + } + /** * @return Response. */ @@ -272,6 +276,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa return res; } + boolean nodeFailedResponse() { + return res != null && res.nodeLeftResponse(); + } + /** * @return Topology locked flag. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 6b88bcf..f69d31d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -146,7 +146,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return false; if (reqState.req.nodeId.equals(nodeId)) { - GridNearAtomicAbstractUpdateRequest req = reqState.processPrimaryResponse(nodeId); + GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail(); if (req != null) { GridNearAtomicUpdateResponse res = primaryFailedResponse(req); @@ -261,7 +261,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (futId == null || futId != res.futureId()) return; - req = reqState.processPrimaryResponse(nodeId); + req = reqState.processPrimaryResponse(nodeId, res); if (req == null) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 60caa14..a44ccf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -172,7 +172,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (singleReq != null) { if (singleReq.req.nodeId.equals(nodeId)) { - GridNearAtomicAbstractUpdateRequest req = singleReq.processPrimaryResponse(nodeId); + GridNearAtomicAbstractUpdateRequest req = singleReq.onPrimaryFail(); if (req != null) { rcvAll = true; @@ -211,7 +211,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean reqDone = false; if (e.getKey().equals(nodeId)) { - GridNearAtomicAbstractUpdateRequest req = reqState.processPrimaryResponse(nodeId); + GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail(); if (req != null) { reqDone = true; @@ -377,7 +377,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; if (singleReq != null) { - req = singleReq.processPrimaryResponse(nodeId); + req = singleReq.processPrimaryResponse(nodeId, res); if (req == null) return; @@ -393,7 +393,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (reqState == null) return; - req = reqState.processPrimaryResponse(nodeId); + req = reqState.processPrimaryResponse(nodeId, res); if (req != null) { if (reqState.onPrimaryResponse(res, cctx)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index e2646a3..3ee6a61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -101,6 +101,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr @GridToStringInclude private List<UUID> dhtNodes; + /** */ + @GridDirectTransient + private boolean nodeLeft; + /** * Empty constructor required by {@link Externalizable}. */ @@ -114,13 +118,24 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param futId Future ID. * @param addDepInfo Deployment info flag. */ - public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, long futId, boolean addDepInfo) { + public GridNearAtomicUpdateResponse(int cacheId, + UUID nodeId, + long futId, + int partId, + boolean nodeLeft, + boolean addDepInfo) { this.cacheId = cacheId; this.nodeId = nodeId; this.futId = futId; + this.partId = partId; + this.nodeLeft = nodeLeft; this.addDepInfo = addDepInfo; } + public boolean nodeLeftResponse() { + return nodeLeft; + } + /** {@inheritDoc} */ @Override public int lookupIndex() { return CACHE_MSG_IDX; @@ -162,13 +177,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** - * @param partId Partition ID for proper striping on near node. - */ - public void partition(int partId) { - this.partId = partId; - } - - /** * Sets update error. * * @param err Error. http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index 075be0e..3b038bd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -81,6 +81,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { cfg.setConsistentId(gridName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(1000); TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); @@ -631,6 +632,121 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testPutMissedDhtRequest_UnstableTopology() throws Exception { + blockRebalance = true; + + ccfg = cacheConfiguration(1, FULL_SYNC); + + startServers(4); + + client = true; + + Ignite client = startGrid(4); + + IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest; + } + }); + + Integer key = primaryKey(ignite(0).cache(TEST_CACHE)); + + log.info("Start put [key=" + key + ']'); + + nearAsyncCache.put(key, key); + IgniteFuture<?> fut = nearAsyncCache.future(); + + U.sleep(500); + + assertFalse(fut.isDone()); + + stopGrid(0); + + fut.get(); + + checkData(F.asMap(key, key)); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllMissedDhtRequest_UnstableTopology1() throws Exception { + putAllMissedDhtRequest_UnstableTopology(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllMissedDhtRequest_UnstableTopology2() throws Exception { + putAllMissedDhtRequest_UnstableTopology(true, true); + } + + /** + * @param fail0 Fail node 0 flag. + * @param fail1 Fail node 1 flag. + * @throws Exception If failed. + */ + private void putAllMissedDhtRequest_UnstableTopology(boolean fail0, boolean fail1) throws Exception { + blockRebalance = true; + + ccfg = cacheConfiguration(1, FULL_SYNC); + + startServers(4); + + client = true; + + Ignite client = startGrid(4); + + IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + if (fail0) { + testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest; + } + }); + } + if (fail1) { + testSpi(ignite(2)).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest; + } + }); + } + + Integer key1 = primaryKey(ignite(0).cache(TEST_CACHE)); + Integer key2 = primaryKey(ignite(2).cache(TEST_CACHE)); + + log.info("Start put [key1=" + key1 + ", key2=" + key1 + ']'); + + Map<Integer, Integer> map = new HashMap<>(); + map.put(key1, 10); + map.put(key2, 20); + + nearAsyncCache.putAll(map); + IgniteFuture<?> fut = nearAsyncCache.future(); + + U.sleep(500); + + assertFalse(fut.isDone()); + + if (fail0) + stopGrid(0); + if (fail1) + stopGrid(2); + + fut.get(); + + checkData(map); + } + + /** * @param expData Expected cache data. */ private void checkData(Map<Integer, Integer> expData) {
