IGNITE-3075 Implement single key-value pair DHT request/response for ATOMIC cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51ca24f2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51ca24f2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51ca24f2 Branch: refs/heads/ignite-4242 Commit: 51ca24f2db32dff9c0034603ea3abfe5ef5cd846 Parents: 88f38ac Author: Konstantin Dudkov <[email protected]> Authored: Mon Nov 21 16:48:44 2016 +0300 Committer: Konstantin Dudkov <[email protected]> Committed: Mon Nov 21 16:48:44 2016 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 10 +- .../processors/cache/GridCacheIoManager.java | 25 +- .../GridDhtAtomicAbstractUpdateFuture.java | 57 +- .../GridDhtAtomicAbstractUpdateRequest.java | 287 ++++++++ .../dht/atomic/GridDhtAtomicCache.java | 17 +- .../atomic/GridDhtAtomicSingleUpdateFuture.java | 61 ++ .../GridDhtAtomicSingleUpdateRequest.java | 678 +++++++++++++++++++ .../dht/atomic/GridDhtAtomicUpdateFuture.java | 26 + .../dht/atomic/GridDhtAtomicUpdateRequest.java | 312 +++------ .../GridNearAtomicAbstractUpdateRequest.java | 5 + .../atomic/GridNearAtomicFullUpdateRequest.java | 108 +-- .../GridNearAtomicSingleUpdateRequest.java | 5 + .../distributed/near/GridNearAtomicCache.java | 8 +- .../GridCacheAtomicMessageCountSelfTest.java | 6 +- ...eAtomicInvalidPartitionHandlingSelfTest.java | 2 +- 15 files changed, 1292 insertions(+), 315 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index b20de68..f36191c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -67,12 +67,13 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; @@ -774,7 +775,12 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..119] [124..127] - this + case -36: + msg = new GridDhtAtomicSingleUpdateRequest(); + + break; + + // [-3..119] [124..127] [-36]- this // [120..123] - DR // [-4..-22, -30..-35] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index c5c1c60..924ce79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -45,6 +45,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; @@ -470,8 +472,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion(); else if (cacheMsg instanceof GridNearAtomicUpdateResponse) return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion(); - else if (cacheMsg instanceof GridDhtAtomicUpdateRequest) - return ((GridDhtAtomicUpdateRequest)cacheMsg).futureVersion(); + else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) + return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion(); else if (cacheMsg instanceof GridDhtAtomicUpdateResponse) return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion(); @@ -486,8 +488,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) { if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest) return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion(); - else if (cacheMsg instanceof GridDhtAtomicUpdateRequest) - return ((GridDhtAtomicUpdateRequest)cacheMsg).writeVersion(); + else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) + return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion(); return null; } @@ -791,6 +793,21 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; + case -36: { + GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg; + + GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( + ctx.cacheId(), + req.futureVersion(), + ctx.deploymentEnabled()); + + res.onError(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + } + + break; + default: throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]", msg.classError()); http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 3bbc348..7e4c4e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -80,7 +81,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb; /** Update request. */ - private final GridNearAtomicAbstractUpdateRequest updateReq; + protected final GridNearAtomicAbstractUpdateRequest updateReq; /** Update response. */ final GridNearAtomicUpdateResponse updateRes; @@ -90,7 +91,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** Mappings. */ @GridToStringInclude - protected Map<UUID, GridDhtAtomicUpdateRequest> mappings; + protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings; /** Continuous query closures. */ private Collection<CI1<Boolean>> cntQryClsrs; @@ -188,23 +189,16 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte UUID nodeId = node.id(); if (!nodeId.equals(cctx.localNodeId())) { - GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); + GridDhtAtomicAbstractUpdateRequest updateReq = mappings.get(nodeId); if (updateReq == null) { - updateReq = new GridDhtAtomicUpdateRequest( - cctx.cacheId(), - nodeId, + updateReq = createRequest( + node, futVer, writeVer, syncMode, topVer, - forceTransformBackups, - this.updateReq.subjectId(), - this.updateReq.taskNameHash(), - forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled(), - this.updateReq.keepBinary(), - this.updateReq.skipStore()); + forceTransformBackups); mappings.put(nodeId, updateReq); } @@ -256,7 +250,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte AffinityTopologyVersion topVer = updateReq.topologyVersion(); for (UUID nodeId : readers) { - GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); + GridDhtAtomicAbstractUpdateRequest updateReq = mappings.get(nodeId); if (updateReq == null) { ClusterNode node = cctx.discovery().node(nodeId); @@ -265,20 +259,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte if (node == null) continue; - updateReq = new GridDhtAtomicUpdateRequest( - cctx.cacheId(), - nodeId, + updateReq = createRequest( + node, futVer, writeVer, syncMode, topVer, - forceTransformBackups, - this.updateReq.subjectId(), - this.updateReq.taskNameHash(), - forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled(), - this.updateReq.keepBinary(), - this.updateReq.skipStore()); + forceTransformBackups); mappings.put(nodeId, updateReq); } @@ -336,7 +323,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte final boolean registerResponse(UUID nodeId) { int resCnt0; - GridDhtAtomicUpdateRequest req = mappings != null ? mappings.get(nodeId) : null; + GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null; if (req != null) { synchronized (this) { @@ -365,7 +352,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte */ final void map() { if (!F.isEmpty(mappings)) { - for (GridDhtAtomicUpdateRequest req : mappings.values()) { + for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) { try { cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); @@ -412,6 +399,24 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte } /** + * @param node Node. + * @param futVer Future version. + * @param writeVer Update version. + * @param syncMode Write synchronization mode. + * @param topVer Topology version. + * @param forceTransformBackups Force transform backups flag. + * @return Request. + */ + protected abstract GridDhtAtomicAbstractUpdateRequest createRequest( + ClusterNode node, + GridCacheVersion futVer, + GridCacheVersion writeVer, + CacheWriteSynchronizationMode syncMode, + @NotNull AffinityTopologyVersion topVer, + boolean forceTransformBackups + ); + + /** * Callback for backup update response. * * @param nodeId Backup node ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java new file mode 100644 index 0000000..f0bea07 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.io.Externalizable; +import java.util.UUID; +import javax.cache.processor.EntryProcessor; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheDeployable; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable { + /** Message index. */ + public static final int CACHE_MSG_IDX = nextIndexId(); + + /** Node ID. */ + @GridDirectTransient + protected UUID nodeId; + + /** On response flag. Access should be synced on future. */ + @GridDirectTransient + private boolean onRes; + + /** + * Empty constructor required by {@link Externalizable}. + */ + protected GridDhtAtomicAbstractUpdateRequest() { + // N-op. + } + + /** + * Constructor. + * + * @param cacheId Cache ID. + * @param nodeId Node ID. + */ + protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) { + this.cacheId = cacheId; + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Keep binary flag. + */ + public abstract boolean keepBinary(); + + /** + * @return Skip write-through to a persistent storage. + */ + public abstract boolean skipStore(); + + /** + * @return {@code True} if on response flag changed. + */ + public boolean onResponse() { + return !onRes && (onRes = true); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** + * @return Force transform backups flag. + */ + public abstract boolean forceTransformBackups(); + + /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + cleanup(); + } + + /** + * @param key Key to add. + * @param val Value, {@code null} if should be removed. + * @param entryProcessor Entry processor. + * @param ttl TTL (optional). + * @param conflictExpireTime Conflict expire time (optional). + * @param conflictVer Conflict version (optional). + * @param addPrevVal If {@code true} adds previous value. + * @param partId Partition. + * @param prevVal Previous value. + * @param updateCntr Update counter. + */ + public abstract void addWriteValue(KeyCacheObject key, + @Nullable CacheObject val, + EntryProcessor<Object, Object, Object> entryProcessor, + long ttl, + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + int partId, + @Nullable CacheObject prevVal, + @Nullable Long updateCntr + ); + + /** + * @param key Key to add. + * @param val Value, {@code null} if should be removed. + * @param entryProcessor Entry processor. + * @param ttl TTL. + * @param expireTime Expire time. + */ + public abstract void addNearWriteValue(KeyCacheObject key, + @Nullable CacheObject val, + EntryProcessor<Object, Object, Object> entryProcessor, + long ttl, + long expireTime); + + /** + * Cleanup values not needed after message was sent. + */ + protected abstract void cleanup(); + + /** + * @return Subject ID. + */ + public abstract UUID subjectId(); + + /** + * @return Task name. + */ + public abstract int taskNameHash(); + + /** + * @return Version assigned on primary node. + */ + public abstract GridCacheVersion futureVersion(); + + /** + * @return Write version. + */ + public abstract GridCacheVersion writeVersion(); + + /** + * @return Cache write synchronization mode. + */ + public abstract CacheWriteSynchronizationMode writeSynchronizationMode(); + + /** + * @return Keys size. + */ + public abstract int size(); + + /** + * @return Keys size. + */ + public abstract int nearSize(); + + /** + * @param key Key to check. + * @return {@code true} if request keys contain key. + */ + public abstract boolean hasKey(KeyCacheObject key); + + /** + * @param idx Key index. + * @return Key. + */ + public abstract KeyCacheObject key(int idx); + + /** + * @param idx Partition index. + * @return Partition id. + */ + public abstract int partitionId(int idx); + + /** + * @param updCntr Update counter. + * @return Update counter. + */ + public abstract Long updateCounter(int updCntr); + + /** + * @param idx Near key index. + * @return Key. + */ + public abstract KeyCacheObject nearKey(int idx); + + /** + * @param idx Key index. + * @return Value. + */ + @Nullable public abstract CacheObject value(int idx); + + /** + * @param idx Key index. + * @return Value. + */ + @Nullable public abstract CacheObject previousValue(int idx); + + /** + * @param idx Key index. + * @return Entry processor. + */ + @Nullable public abstract EntryProcessor<Object, Object, Object> entryProcessor(int idx); + + /** + * @param idx Near key index. + * @return Value. + */ + @Nullable public abstract CacheObject nearValue(int idx); + + /** + * @param idx Key index. + * @return Transform closure. + */ + @Nullable public abstract EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx); + + /** + * @param idx Index. + * @return Conflict version. + */ + @Nullable public abstract GridCacheVersion conflictVersion(int idx); + + /** + * @param idx Index. + * @return TTL. + */ + public abstract long ttl(int idx); + + /** + * @param idx Index. + * @return TTL for near cache update. + */ + public abstract long nearTtl(int idx); + + /** + * @param idx Index. + * @return Conflict expire time. + */ + public abstract long conflictExpireTime(int idx); + + /** + * @param idx Index. + * @return Expire time for near cache update. + */ + public abstract long nearExpireTime(int idx); + + /** + * @return Optional arguments for entry processor. + */ + @Nullable public abstract Object[] invokeArguments(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d7eb062..2a7055d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -360,11 +360,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().addHandler( ctx.cacheId(), - GridDhtAtomicUpdateRequest.class, - new CI2<UUID, GridDhtAtomicUpdateRequest>() { + GridDhtAtomicAbstractUpdateRequest.class, + new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() { @Override public void apply( UUID nodeId, - GridDhtAtomicUpdateRequest req + GridDhtAtomicAbstractUpdateRequest req ) { processDhtAtomicUpdateRequest( nodeId, @@ -3100,12 +3100,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicAbstractUpdateFuture fut = (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); - if (fut != null) { - if (fut instanceof GridNearAtomicSingleUpdateFuture) - ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res, false); - else - ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false); - } + if (fut != null) + fut.onResult(nodeId, res, false); + else U.warn(msgLog, "Failed to find near update future for update response (will ignore) " + "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']'); @@ -3115,7 +3112,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nodeId Sender node ID. * @param req Dht atomic update request. */ - private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicUpdateRequest req) { + private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) { if (msgLog.isDebugEnabled()) { msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() + ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index f83a7b7..656caab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.Collection; import java.util.List; import java.util.UUID; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -30,6 +32,8 @@ import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; +import org.jetbrains.annotations.NotNull; /** * @@ -38,6 +42,9 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture /** */ private static final long serialVersionUID = 0L; + /** */ + private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4"); + /** Future keys. */ private KeyCacheObject key; @@ -87,6 +94,49 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture } /** {@inheritDoc} */ + @Override protected GridDhtAtomicAbstractUpdateRequest createRequest( + ClusterNode node, + GridCacheVersion futVer, + GridCacheVersion writeVer, + CacheWriteSynchronizationMode syncMode, + @NotNull AffinityTopologyVersion topVer, + boolean forceTransformBackups + ) { + if (canUseSingleRequest(node)) { + assert !forceTransformBackups; + + return new GridDhtAtomicSingleUpdateRequest( + cctx.cacheId(), + node.id(), + futVer, + writeVer, + syncMode, + topVer, + updateReq.subjectId(), + updateReq.taskNameHash(), + cctx.deploymentEnabled(), + updateReq.keepBinary(), + updateReq.skipStore()); + } + else { + return new GridDhtAtomicUpdateRequest( + cctx.cacheId(), + node.id(), + futVer, + writeVer, + syncMode, + topVer, + forceTransformBackups, + updateReq.subjectId(), + updateReq.taskNameHash(), + forceTransformBackups ? updateReq.invokeArguments() : null, + cctx.deploymentEnabled(), + updateReq.keepBinary(), + updateReq.skipStore()); + } + } + + /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) { if (log.isDebugEnabled()) log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']'); @@ -114,6 +164,17 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture updateRes.addFailedKey(key, err); } + /** + * @param node Target node + * @return {@code true} if target node supports {@link GridNearAtomicSingleUpdateRequest} + */ + private boolean canUseSingleRequest(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 && + cctx.expiry() == null && + updateReq.expiry() == null && + !updateReq.hasConflictData(); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtAtomicSingleUpdateFuture.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java new file mode 100644 index 0000000..a03d948 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -0,0 +1,678 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.io.Externalizable; +import java.nio.ByteBuffer; +import java.util.UUID; +import javax.cache.processor.EntryProcessor; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK; + +/** + * + */ +public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdateRequest { + /** */ + private static final long serialVersionUID = 0L; + + /** Near cache key flag. */ + private static final int NEAR_FLAG_MASK = 0x80; + + /** Future version. */ + protected GridCacheVersion futVer; + + /** Write version. */ + protected GridCacheVersion writeVer; + + /** Write synchronization mode. */ + protected CacheWriteSynchronizationMode syncMode; + + /** Topology version. */ + protected AffinityTopologyVersion topVer; + + /** Subject ID. */ + protected UUID subjId; + + /** Task name hash. */ + protected int taskNameHash; + + /** Additional flags. */ + protected byte flags; + + /** Key to update. */ + @GridToStringInclude + protected KeyCacheObject key; + + /** Value to update. */ + @GridToStringInclude + protected CacheObject val; + + /** Previous value. */ + @GridToStringInclude + protected CacheObject prevVal; + + /** Partition. */ + protected long updateCntr; + + /** */ + protected int partId; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridDhtAtomicSingleUpdateRequest() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheId Cache ID. + * @param nodeId Node ID. + * @param futVer Future version. + * @param writeVer Write version for cache values. + * @param syncMode Cache write synchronization mode. + * @param topVer Topology version. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param addDepInfo Deployment info. + * @param keepBinary Keep binary flag. + * @param skipStore Skip store flag. + */ + GridDhtAtomicSingleUpdateRequest( + int cacheId, + UUID nodeId, + GridCacheVersion futVer, + GridCacheVersion writeVer, + CacheWriteSynchronizationMode syncMode, + @NotNull AffinityTopologyVersion topVer, + UUID subjId, + int taskNameHash, + boolean addDepInfo, + boolean keepBinary, + boolean skipStore + ) { + super(cacheId, nodeId); + this.futVer = futVer; + this.writeVer = writeVer; + this.syncMode = syncMode; + this.topVer = topVer; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.addDepInfo = addDepInfo; + + if (skipStore) + setFlag(true, SKIP_STORE_FLAG_MASK); + if (keepBinary) + setFlag(true, KEEP_BINARY_FLAG_MASK); + } + + /** + * @param key Key to add. + * @param val Value, {@code null} if should be removed. + * @param entryProcessor Entry processor. + * @param ttl TTL (optional). + * @param conflictExpireTime Conflict expire time (optional). + * @param conflictVer Conflict version (optional). + * @param addPrevVal If {@code true} adds previous value. + * @param partId Partition. + * @param prevVal Previous value. + * @param updateCntr Update counter. + */ + @Override public void addWriteValue(KeyCacheObject key, + @Nullable CacheObject val, + EntryProcessor<Object, Object, Object> entryProcessor, + long ttl, + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + int partId, + @Nullable CacheObject prevVal, + @Nullable Long updateCntr + ) { + assert entryProcessor == null; + assert ttl <= 0 : ttl; + assert conflictExpireTime <= 0 : conflictExpireTime; + assert conflictVer == null : conflictVer; + + near(false); + + this.key = key; + this.partId = partId; + this.val = val; + + if (addPrevVal) + this.prevVal = prevVal; + + if (updateCntr != null) + this.updateCntr = updateCntr; + } + + /** + * @param key Key to add. + * @param val Value, {@code null} if should be removed. + * @param entryProcessor Entry processor. + * @param ttl TTL. + * @param expireTime Expire time. + */ + @Override public void addNearWriteValue(KeyCacheObject key, + @Nullable CacheObject val, + EntryProcessor<Object, Object, Object> entryProcessor, + long ttl, + long expireTime) { + assert entryProcessor == null; + assert ttl <= 0 : ttl; + + near(true); + + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean forceTransformBackups() { + return false; + } + + /** {@inheritDoc} */ + @Override public int size() { + return key != null ? near() ? 0 : 1 : 0; + } + + /** {@inheritDoc} */ + @Override public int nearSize() { + return key != null ? near() ? 1 : 0 : 0; + } + + /** {@inheritDoc} */ + @Override public boolean hasKey(KeyCacheObject key) { + return !near() && F.eq(this.key, key); + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return isFlag(SKIP_STORE_FLAG_MASK); + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject key(int idx) { + assert idx == 0 : idx; + + return near() ? null : key; + } + + /** {@inheritDoc} */ + @Override public int partitionId(int idx) { + assert idx == 0 : idx; + + return partId; + } + + /** {@inheritDoc} */ + @Override public Long updateCounter(int updCntr) { + assert updCntr == 0 : updCntr; + + return updateCntr; + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject nearKey(int idx) { + assert idx == 0 : idx; + + return near() ? key : null; + } + + /** {@inheritDoc} */ + @Override @Nullable public CacheObject value(int idx) { + assert idx == 0 : idx; + + return near() ? null : val; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion futureVersion() { + return futVer; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion writeVersion() { + return writeVer; + } + + /** {@inheritDoc} */ + @Override public int taskNameHash() { + return taskNameHash; + } + + /** {@inheritDoc} */ + @Override public UUID subjectId() { + return subjId; + } + + /** {@inheritDoc} */ + @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { + return syncMode; + } + + /** {@inheritDoc} */ + @Override @Nullable public CacheObject previousValue(int idx) { + assert idx == 0 : idx; + + return prevVal; + } + + /** {@inheritDoc} */ + @Override @Nullable public CacheObject nearValue(int idx) { + assert idx == 0 : idx; + + return near() ? val : null; + } + + /** {@inheritDoc} */ + @Override @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { + assert idx == 0 : idx; + + return null; + } + + /** {@inheritDoc} */ + @Override @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) { + assert idx == 0 : idx; + + return null; + } + + /** {@inheritDoc} */ + @Override @Nullable public GridCacheVersion conflictVersion(int idx) { + assert idx == 0 : idx; + + return null; + } + + /** {@inheritDoc} */ + @Override public long ttl(int idx) { + assert idx == 0 : idx; + + return CU.TTL_NOT_CHANGED; + } + + /** {@inheritDoc} */ + @Override public long nearTtl(int idx) { + assert idx == 0 : idx; + + return CU.TTL_NOT_CHANGED; + } + + /** {@inheritDoc} */ + @Override public long conflictExpireTime(int idx) { + assert idx == 0 : idx; + + return CU.EXPIRE_TIME_CALCULATE; + } + + /** {@inheritDoc} */ + @Override public long nearExpireTime(int idx) { + assert idx == 0 : idx; + + return CU.EXPIRE_TIME_CALCULATE; + } + + /** {@inheritDoc} */ + @Override @Nullable public Object[] invokeArguments() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean keepBinary() { + return isFlag(KEEP_BINARY_FLAG_MASK); + } + + /** + * + */ + private boolean near() { + return isFlag(NEAR_FLAG_MASK); + } + + /** + * + */ + private void near(boolean near) { + setFlag(near, NEAR_FLAG_MASK); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + prepareMarshalObject(key, cctx); + + prepareMarshalObject(val, cctx); + + prepareMarshalObject(prevVal, cctx); + + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + finishUnmarshalObject(key, cctx, ldr); + + finishUnmarshalObject(val, cctx, ldr); + + finishUnmarshalObject(prevVal, cctx, ldr); + + key.partition(partId); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMessage("futVer", futVer)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeMessage("key", key)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeInt("partId", partId)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeMessage("prevVal", prevVal)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeUuid("subjId", subjId)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 12: + if (!writer.writeLong("updateCntr", updateCntr)) + return false; + + writer.incrementState(); + + case 13: + if (!writer.writeMessage("val", val)) + return false; + + writer.incrementState(); + + case 14: + if (!writer.writeMessage("writeVer", writeVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + futVer = reader.readMessage("futVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + key = reader.readMessage("key"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + partId = reader.readInt("partId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + prevVal = reader.readMessage("prevVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + byte syncModeOrd; + + syncModeOrd = reader.readByte("syncMode"); + + if (!reader.isLastRead()) + return false; + + syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); + + reader.incrementState(); + + case 10: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: + updateCntr = reader.readLong("updateCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: + val = reader.readMessage("val"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: + writeVer = reader.readMessage("writeVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtAtomicSingleUpdateRequest.class); + } + + /** + * @param obj CacheObject to marshal + * @param ctx context + * @throws IgniteCheckedException if error + */ + private void prepareMarshalObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException { + if (obj != null) + obj.prepareMarshal(ctx.cacheObjectContext()); + } + + /** + * @param obj CacheObject un to marshal + * @param ctx context + * @param ldr class loader + * @throws IgniteCheckedException if error + */ + private void finishUnmarshalObject(@Nullable CacheObject obj, GridCacheContext ctx, + ClassLoader ldr) throws IgniteCheckedException { + if (obj != null) + obj.finishUnmarshal(ctx.cacheObjectContext(), ldr); + } + + /** + * Cleanup values not needed after message was sent. + */ + @Override protected void cleanup() { + val = null; + prevVal = null; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -36; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 15; + } + + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reags flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtAtomicSingleUpdateRequest.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 864aadd..dd1f1c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -23,7 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -33,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; /** * DHT atomic cache backup update future. @@ -118,6 +121,29 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { } /** {@inheritDoc} */ + @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node, + GridCacheVersion futVer, + GridCacheVersion writeVer, + CacheWriteSynchronizationMode syncMode, + @NotNull AffinityTopologyVersion topVer, + boolean forceTransformBackups) { + return new GridDhtAtomicUpdateRequest( + cctx.cacheId(), + node.id(), + futVer, + writeVer, + syncMode, + topVer, + forceTransformBackups, + updateReq.subjectId(), + updateReq.taskNameHash(), + forceTransformBackups ? updateReq.invokeArguments() : null, + cctx.deploymentEnabled(), + updateReq.keepBinary(), + updateReq.skipStore()); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtAtomicUpdateFuture.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 55f7560..f2fbb0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -20,25 +20,22 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.UUID; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -52,17 +49,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_ST /** * Lite dht cache backup update request. */ -public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateRequest { /** */ private static final long serialVersionUID = 0L; - /** Message index. */ - public static final int CACHE_MSG_IDX = nextIndexId(); - - /** Node ID. */ - @GridDirectTransient - private UUID nodeId; - /** Future version. */ private GridCacheVersion futVer; @@ -151,10 +141,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** Partition. */ private GridLongList updateCntrs; - /** On response flag. Access should be synced on future. */ - @GridDirectTransient - private boolean onRes; - /** */ @GridDirectTransient private List<Integer> partIds; @@ -162,9 +148,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** Keep binary flag. */ private boolean keepBinary; - /** - * Additional flags. - */ + /** Additional flags. */ private byte flags; /** @@ -204,10 +188,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid boolean keepBinary, boolean skipStore ) { - assert invokeArgs == null || forceTransformBackups; + super(cacheId, nodeId); - this.cacheId = cacheId; - this.nodeId = nodeId; this.futVer = futVer; this.writeVer = writeVer; this.syncMode = syncMode; @@ -215,12 +197,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid this.forceTransformBackups = forceTransformBackups; this.subjId = subjId; this.taskNameHash = taskNameHash; + + assert invokeArgs == null || forceTransformBackups; + this.invokeArgs = invokeArgs; this.addDepInfo = addDepInfo; this.keepBinary = keepBinary; - if (skipStore) - flags = (byte)(flags | SKIP_STORE_FLAG_MASK); + setFlag(skipStore, SKIP_STORE_FLAG_MASK); keys = new ArrayList<>(); partIds = new ArrayList<>(); @@ -233,26 +217,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid vals = new ArrayList<>(); } - /** - * @return Force transform backups flag. - */ - public boolean forceTransformBackups() { - return forceTransformBackups; - } - - /** - * @param key Key to add. - * @param val Value, {@code null} if should be removed. - * @param entryProcessor Entry processor. - * @param ttl TTL (optional). - * @param conflictExpireTime Conflict expire time (optional). - * @param conflictVer Conflict version (optional). - * @param addPrevVal If {@code true} adds previous value. - * @param partId Partition. - * @param prevVal Previous value. - * @param updateCntr Update counter. - */ - public void addWriteValue(KeyCacheObject key, + /** {@inheritDoc} */ + @Override public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, @@ -328,14 +294,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid conflictExpireTimes.add(conflictExpireTime); } - /** - * @param key Key to add. - * @param val Value, {@code null} if should be removed. - * @param entryProcessor Entry processor. - * @param ttl TTL. - * @param expireTime Expire time. - */ - public void addNearWriteValue(KeyCacheObject key, + /** {@inheritDoc} */ + @Override public void addNearWriteValue(KeyCacheObject key, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, @@ -387,183 +347,114 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; + @Override public boolean forceTransformBackups() { + return forceTransformBackups; } - /** - * @return Subject ID. - */ - public UUID subjectId() { + /** {@inheritDoc} */ + @Override public UUID subjectId() { return subjId; } - /** - * @return Task name. - */ - public int taskNameHash() { + /** {@inheritDoc} */ + @Override public int taskNameHash() { return taskNameHash; } - /** - * @return Keys size. - */ - public int size() { - return keys.size(); - } - - /** - * @return Keys size. - */ - public int nearSize() { - return nearKeys != null ? nearKeys.size() : 0; - } - - /** - * @return Version assigned on primary node. - */ - public GridCacheVersion futureVersion() { + /** {@inheritDoc} */ + @Override public GridCacheVersion futureVersion() { return futVer; } - /** - * @return Write version. - */ - public GridCacheVersion writeVersion() { + /** {@inheritDoc} */ + @Override public GridCacheVersion writeVersion() { return writeVer; } - /** - * @return Cache write synchronization mode. - */ - public CacheWriteSynchronizationMode writeSynchronizationMode() { + /** {@inheritDoc} */ + @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { return syncMode; } - /** - * @return Topology version. - */ + /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion() { return topVer; } - /** - * @return Keys. - */ - public Collection<KeyCacheObject> keys() { - return keys; + /** {@inheritDoc} */ + @Override public int size() { + return keys.size(); } - /** - * @param idx Key index. - * @return Key. - */ - public KeyCacheObject key(int idx) { - return keys.get(idx); + /** {@inheritDoc} */ + @Override public int nearSize() { + return nearKeys != null ? nearKeys.size() : 0; } - /** - * @param idx Partition index. - * @return Partition id. - */ - public int partitionId(int idx) { - return partIds.get(idx); + /** {@inheritDoc} */ + @Override public boolean hasKey(KeyCacheObject key) { + return F.contains(keys, key); } - /** - * @return Skip write-through to a persistent storage. - */ - public boolean skipStore() { - return (flags & SKIP_STORE_FLAG_MASK) == SKIP_STORE_FLAG_MASK; + /** {@inheritDoc} */ + @Override public KeyCacheObject key(int idx) { + return keys.get(idx); } - /** - * @param updCntr Update counter. - * @return Update counter. - */ - public Long updateCounter(int updCntr) { + /** {@inheritDoc} */ + @Override public int partitionId(int idx) { + return partIds.get(idx); + } + + /** {@inheritDoc} */ + @Override public Long updateCounter(int updCntr) { if (updateCntrs != null && updCntr < updateCntrs.size()) return updateCntrs.get(updCntr); return null; } - /** - * @param idx Near key index. - * @return Key. - */ - public KeyCacheObject nearKey(int idx) { + /** {@inheritDoc} */ + @Override public KeyCacheObject nearKey(int idx) { return nearKeys.get(idx); } - /** - * @return Keep binary flag. - */ - public boolean keepBinary() { - return keepBinary; - } - - /** - * @param idx Key index. - * @return Value. - */ - @Nullable public CacheObject value(int idx) { + /** {@inheritDoc} */ + @Override @Nullable public CacheObject value(int idx) { if (vals != null) return vals.get(idx); return null; } - /** - * @param idx Key index. - * @return Value. - */ - @Nullable public CacheObject previousValue(int idx) { + /** {@inheritDoc} */ + @Override @Nullable public CacheObject previousValue(int idx) { if (prevVals != null) return prevVals.get(idx); return null; } - /** - * @param idx Key index. - * @return Entry processor. - */ - @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { + /** {@inheritDoc} */ + @Override @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { return entryProcessors == null ? null : entryProcessors.get(idx); } - /** - * @param idx Near key index. - * @return Value. - */ - @Nullable public CacheObject nearValue(int idx) { + /** {@inheritDoc} */ + @Override @Nullable public CacheObject nearValue(int idx) { if (nearVals != null) return nearVals.get(idx); return null; } - /** - * @param idx Key index. - * @return Transform closure. - */ - @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) { + /** {@inheritDoc} */ + @Override @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) { return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx); } - /** - * @param idx Index. - * @return Conflict version. - */ - @Nullable public GridCacheVersion conflictVersion(int idx) { + /** {@inheritDoc} */ + @Override @Nullable public GridCacheVersion conflictVersion(int idx) { if (conflictVers != null) { assert idx >= 0 && idx < conflictVers.size(); @@ -573,11 +464,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return null; } - /** - * @param idx Index. - * @return TTL. - */ - public long ttl(int idx) { + /** {@inheritDoc} */ + @Override public long ttl(int idx) { if (ttls != null) { assert idx >= 0 && idx < ttls.size(); @@ -587,11 +475,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return CU.TTL_NOT_CHANGED; } - /** - * @param idx Index. - * @return TTL for near cache update. - */ - public long nearTtl(int idx) { + /** {@inheritDoc} */ + @Override public long nearTtl(int idx) { if (nearTtls != null) { assert idx >= 0 && idx < nearTtls.size(); @@ -601,11 +486,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return CU.TTL_NOT_CHANGED; } - /** - * @param idx Index. - * @return Conflict expire time. - */ - public long conflictExpireTime(int idx) { + /** {@inheritDoc} */ + @Override public long conflictExpireTime(int idx) { if (conflictExpireTimes != null) { assert idx >= 0 && idx < conflictExpireTimes.size(); @@ -615,11 +497,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return CU.EXPIRE_TIME_CALCULATE; } - /** - * @param idx Index. - * @return Expire time for near cache update. - */ - public long nearExpireTime(int idx) { + /** {@inheritDoc} */ + @Override public long nearExpireTime(int idx) { if (nearExpireTimes != null) { assert idx >= 0 && idx < nearExpireTimes.size(); @@ -629,17 +508,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return CU.EXPIRE_TIME_CALCULATE; } - /** - * @return {@code True} if on response flag changed. - */ - public boolean onResponse() { - return !onRes && (onRes = true); + /** {@inheritDoc} */ + @Override public boolean keepBinary() { + return keepBinary; } - /** - * @return Optional arguments for entry processor. - */ - @Nullable public Object[] invokeArguments() { + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return isFlag(SKIP_STORE_FLAG_MASK); + } + + /** {@inheritDoc} */ + @Override @Nullable public Object[] invokeArguments() { return invokeArgs; } @@ -711,16 +591,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return addDepInfo; - } - - /** {@inheritDoc} */ - @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { - return ctx.atomicMessageLogger(); - } - - /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -1083,14 +953,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** {@inheritDoc} */ - @Override public void onAckReceived() { - cleanup(); - } - - /** - * Cleanup values not needed after message was sent. - */ - private void cleanup() { + @Override protected void cleanup() { nearVals = null; prevVals = null; } @@ -1105,6 +968,27 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return 26; } + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reags flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index bee2ecd..bae9e3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -223,4 +223,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa * @return Key. */ public abstract KeyCacheObject key(int idx); + + /** + * @return {@code True} if request does not have conflict data. + */ + public abstract boolean hasConflictData(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index b733d7b..c785828 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -68,24 +69,49 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat /** Future version. */ private GridCacheVersion futVer; - /** Fast map flag. */ - private boolean fastMap; - /** Update version. Set to non-null if fastMap is {@code true}. */ private GridCacheVersion updateVer; /** Topology version. */ private AffinityTopologyVersion topVer; - /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ - private boolean topLocked; - /** Write synchronization mode. */ private CacheWriteSynchronizationMode syncMode; /** Update operation. */ private GridCacheOperation op; + /** Subject ID. */ + protected UUID subjId; + + /** Task name hash. */ + protected int taskNameHash; + + /** */ + @GridDirectTransient + private GridNearAtomicUpdateResponse res; + + /** Fast map flag. */ + protected boolean fastMap; + + /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ + protected boolean topLocked; + + /** Flag indicating whether request contains primary keys. */ + protected boolean hasPrimary; + + /** Skip write-through to a persistent storage. */ + protected boolean skipStore; + + /** */ + protected boolean clientReq; + + /** Keep binary flag. */ + protected boolean keepBinary; + + /** Return value flag. */ + protected boolean retval; + /** Keys to update. */ @GridToStringInclude @GridDirectCollection(KeyCacheObject.class) @@ -107,13 +133,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat @GridDirectCollection(byte[].class) private List<byte[]> entryProcessorsBytes; - /** Optional arguments for entry processor. */ - @GridDirectTransient - private Object[] invokeArgs; - - /** Entry processor arguments bytes. */ - private byte[][] invokeArgsBytes; - /** Conflict versions. */ @GridDirectCollection(GridCacheVersion.class) private List<GridCacheVersion> conflictVers; @@ -124,8 +143,12 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat /** Conflict expire times. */ private GridLongList conflictExpireTimes; - /** Return value flag. */ - private boolean retval; + /** Optional arguments for entry processor. */ + @GridDirectTransient + private Object[] invokeArgs; + + /** Entry processor arguments bytes. */ + private byte[][] invokeArgsBytes; /** Expiry policy. */ @GridDirectTransient @@ -137,28 +160,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat /** Filter. */ private CacheEntryPredicate[] filter; - /** Flag indicating whether request contains primary keys. */ - private boolean hasPrimary; - - /** Subject ID. */ - private UUID subjId; - - /** Task name hash. */ - private int taskNameHash; - - /** Skip write-through to a persistent storage. */ - private boolean skipStore; - - /** */ - private boolean clientReq; - - /** Keep binary flag. */ - private boolean keepBinary; - - /** */ - @GridDirectTransient - private GridNearAtomicUpdateResponse res; - /** Maximum possible size of inner collections. */ @GridDirectTransient private int initSize; @@ -523,7 +524,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat } /** {@inheritDoc} */ - @Nullable @Override public CacheEntryPredicate[] filter() { + @Override @Nullable public CacheEntryPredicate[] filter() { return filter; } @@ -533,11 +534,19 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat } /** {@inheritDoc} */ + @Override public boolean hasConflictData() { + return F.size(conflictVers) > 0 || conflictTtls != null || conflictExpireTimes != null; + } + + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); GridCacheContext cctx = ctx.cacheContext(cacheId); + if (expiryPlc != null && expiryPlcBytes == null) + expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); + prepareMarshalCacheObjects(keys, cctx); if (filter != null) { @@ -555,9 +564,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat filter = null; } - if (expiryPlc != null && expiryPlcBytes == null) - expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); - if (op == TRANSFORM) { // force addition of deployment info for entry processors if P2P is enabled globally. if (!addDepInfo && ctx.deploymentEnabled()) @@ -579,8 +585,18 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat GridCacheContext cctx = ctx.cacheContext(cacheId); + if (expiryPlcBytes != null && expiryPlc == null) + expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + finishUnmarshalCacheObjects(keys, cctx, ldr); + if (filter != null) { + for (CacheEntryPredicate p : filter) { + if (p != null) + p.finishUnmarshal(cctx, ldr); + } + } + if (op == TRANSFORM) { if (entryProcessors == null) entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); @@ -591,16 +607,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat else finishUnmarshalCacheObjects(vals, cctx, ldr); - if (filter != null) { - for (CacheEntryPredicate p : filter) { - if (p != null) - p.finishUnmarshal(cctx, ldr); - } - } - - if (expiryPlcBytes != null && expiryPlc == null) - expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - if (partIds != null && !partIds.isEmpty()) { assert partIds.size() == keys.size(); http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 211b472..f3b9726 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -226,6 +226,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin return CU.EXPIRE_TIME_CALCULATE; } + /** {@inheritDoc} */ + @Override public boolean hasConflictData() { + return false; + } + /** * {@inheritDoc} * http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index b5b2c72..a8219b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -41,8 +41,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; @@ -302,15 +302,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { */ public void processDhtAtomicUpdateRequest( UUID nodeId, - GridDhtAtomicUpdateRequest req, + GridDhtAtomicAbstractUpdateRequest req, GridDhtAtomicUpdateResponse res ) { GridCacheVersion ver = req.writeVersion(); assert ver != null; - Collection<KeyCacheObject> backupKeys = req.keys(); - boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); @@ -329,7 +327,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { break; } - if (F.contains(backupKeys, key)) { // Reader became backup. + if (req.hasKey(key)) { // Reader became backup. if (entry.markObsolete(ver)) removeEntry(entry); http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java index a6d612a..e8c5db1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; @@ -141,6 +142,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class); commSpi.registerMessage(GridNearAtomicFullUpdateRequest.class); commSpi.registerMessage(GridDhtAtomicUpdateRequest.class); + commSpi.registerMessage(GridDhtAtomicSingleUpdateRequest.class); int putCnt = 15; @@ -171,7 +173,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest assertEquals(expNearCnt, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class)); assertEquals(expNearSingleCnt, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class)); - assertEquals(expDhtCnt, commSpi.messageCount(GridDhtAtomicUpdateRequest.class)); + assertEquals(expDhtCnt, commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class)); if (writeOrderMode == CLOCK) { for (int i = 1; i < 4; i++) { @@ -179,7 +181,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class)); assertEquals(0, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class)); - assertEquals(0, commSpi.messageCount(GridDhtAtomicUpdateRequest.class)); + assertEquals(0, commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class)); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java index 0899423..644e310 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java @@ -478,7 +478,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA Object origMsg = msg.message(); return delay && - ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || (origMsg instanceof GridDhtAtomicUpdateRequest)); + ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || (origMsg instanceof GridDhtAtomicAbstractUpdateRequest)); } } } \ No newline at end of file
