Wired up single response into cache logic.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe132a5f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe132a5f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe132a5f Branch: refs/heads/ignite-2523-1 Commit: fe132a5feb0d428c0c86d85e79bf669c820373c6 Parents: c011b1c Author: vozerov-gridgain <[email protected]> Authored: Tue Apr 26 16:24:14 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Apr 26 16:24:14 2016 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 6 +++++ .../processors/cache/GridCacheIoManager.java | 3 ++- .../dht/atomic/GridDhtAtomicCache.java | 28 ++++++++++++++++++-- .../GridNearAtomicSingleUpdateFuture.java | 4 +-- .../GridNearAtomicSingleUpdateResponse.java | 2 +- .../IgniteClientReconnectCacheTest.java | 3 ++- 6 files changed, 39 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fe132a5f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index e028cf4..833b5b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; @@ -732,6 +733,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case -24: + msg = new GridNearAtomicSingleUpdateResponse(); + + break; + // [-3..119] [124] - this // [120..123] - DR // [-4..-22] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/fe132a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 4113749..1c72e5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; @@ -608,7 +609,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case -23: { GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg; - GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse( + GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicSingleUpdateResponse( ctx.cacheId(), nodeId, req.futureVersion(), http://git-wip-us.apache.org/repos/asf/ignite/blob/fe132a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 36cd098..4018663 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -274,6 +274,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); + ctx.io().addHandler(ctx.cacheId(), GridNearAtomicSingleUpdateResponse.class, new CI2<UUID, GridNearAtomicSingleUpdateResponse>() { + @Override public void apply(UUID nodeId, GridNearAtomicSingleUpdateResponse res) { + processNearAtomicUpdateResponse(nodeId, res); + } + }); + ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { processDhtAtomicUpdateRequest(nodeId, req); @@ -1494,6 +1500,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** + * Create near update response. + * + * @param nodeId Node ID. + * @param req Request. + * @return Response. + */ + private GridNearAtomicAbstractUpdateResponse createNearResponse(UUID nodeId, + GridNearAtomicAbstractUpdateRequest req) { + if (req.isSingle()) { + return new GridNearAtomicSingleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), + ctx.deploymentEnabled()); + } + else { + return new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), + ctx.deploymentEnabled()); + } + } + + /** * Executes local update after preloader fetched values. * * @param nodeId Node ID. @@ -1505,8 +1530,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicAbstractUpdateRequest req, CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse> completionCb ) { - GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, - req.futureVersion(), ctx.deploymentEnabled()); + GridNearAtomicAbstractUpdateResponse res = createNearResponse(nodeId, req); assert !req.returnValue() || (req.operation() == TRANSFORM || req.keysCount() == 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/fe132a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 064e067..a6cff49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -153,7 +153,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda this.req : null; if (req != null && req.response() == null) { - res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + res = new GridNearAtomicSingleUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(), cctx.deploymentEnabled()); @@ -492,7 +492,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda */ void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { synchronized (mux) { - GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicSingleUpdateResponse(cctx.cacheId(), req.nodeId(), req.futureVersion(), cctx.deploymentEnabled()); http://git-wip-us.apache.org/repos/asf/ignite/blob/fe132a5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java index 7a7c04d..9ec6fcf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java @@ -522,7 +522,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridNearAtomicAbstractUp /** {@inheritDoc} */ @Override public byte directType() { - return 41; + return -24; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/fe132a5f/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 8451a55..f1eac1a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -51,6 +51,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; @@ -815,7 +816,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac Collection<Class> clss = new HashSet<>(); clss.add(GridNearAtomicUpdateResponse.class); - // TODO: Add single. + clss.add(GridNearAtomicSingleUpdateResponse.class); checkOperationInProgressFails(client, ccfg, clss, putOp); }
