http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index c92e0f5..39abb73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -17,12 +17,22 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; @@ -31,16 +41,19 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; /** * Base for near atomic update futures. @@ -108,28 +121,24 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt protected boolean topLocked; /** Remap count. */ + @GridToStringInclude protected int remapCnt; /** Current topology version. */ + @GridToStringInclude protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; /** */ - protected GridCacheVersion updVer; - - /** Topology version when got mapping error. */ - protected AffinityTopologyVersion mapErrTopVer; - - /** */ - protected int resCnt; + @GridToStringInclude + protected AffinityTopologyVersion remapTopVer; /** Error. */ + @GridToStringInclude protected CachePartialUpdateCheckedException err; /** Future ID. */ - protected GridCacheVersion futVer; - - /** Completion future for a particular topology version. */ - protected GridFutureAdapter<Void> topCompleteFut; + @GridToStringInclude + protected Long futId; /** Operation result. */ protected GridCacheReturn opRes; @@ -198,10 +207,30 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt this.remapCnt = remapCnt; } + /** {@inheritDoc} */ + @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + return null; + } + + /** + * @param req Request. + */ + void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) { + try { + cctx.io().send(req.updateRequest().nodeId(), req, cctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + onSendError(req, e); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + /** * Performs future mapping. */ - public void map() { + public final void map() { AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); if (topVer == null) @@ -212,18 +241,14 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt // Cannot remap. remapCnt = 1; - GridCacheVersion futVer = addAtomicFuture(topVer); - - if (futVer != null) - map(topVer, futVer); + map(topVer); } } /** * @param topVer Topology version. - * @param futVer Future version */ - protected abstract void map(AffinityTopologyVersion topVer, GridCacheVersion futVer); + protected abstract void map(AffinityTopologyVersion topVer); /** * Maps future on ready topology. @@ -248,8 +273,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt /** * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. */ - protected boolean storeFuture() { - return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + final boolean storeFuture() { + return syncMode != FULL_ASYNC; } /** @@ -258,12 +283,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * @param nodeId Node ID. * @param req Request. */ - protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { + final void sendSingleRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (cctx.localNodeId().equals(nodeId)) { cache.updateAllAsyncInternal(nodeId, req, - new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() { + new GridDhtAtomicCache.UpdateReplyClosure() { @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res, false); + if (syncMode != FULL_ASYNC) + onPrimaryResponse(res.nodeId(), res, false); + else if (res.remapTopologyVersion() != null) + ((GridDhtAtomicCache)cctx.cache()).remapToNewPrimary(req); } }); } @@ -272,18 +300,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + + msgLog.debug("Near update fut, sent request [futId=" + req.futureId() + ", node=" + req.nodeId() + ']'); } - - if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, true, null, true)); } catch (IgniteCheckedException e) { if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + + msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() + ", node=" + req.nodeId() + ", err=" + e + ']'); } @@ -300,46 +323,377 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * @param res Update response. * @param nodeErr {@code True} if response was created on node failure. */ - public abstract void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr); + public abstract void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr); + + /** + * @param nodeId Node ID. + * @param res Response. + */ + public abstract void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res); /** * @param req Request. - * @param e Error. + * @param res Response. */ - protected final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { - synchronized (mux) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - req.nodeId(), - req.futureVersion(), - cctx.deploymentEnabled()); + final void onPrimaryError(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { + assert res.error() != null; + + if (err == null) + err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + Collection<KeyCacheObject> keys0 = res.failedKeys() != null ? res.failedKeys() : req.keys(); + + Collection<Object> keys = new ArrayList<>(keys0.size()); - res.addFailedKeys(req.keys(), e); + for (KeyCacheObject key : keys0) + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); - onResult(req.nodeId(), res, true); + err.add(keys, res.error(), req.topologyVersion()); + } + + /** + * @param req Request. + * @return Response to notify about primary failure. + */ + final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) { + assert req.response() == null : req; + assert req.nodeId() != null : req; + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near update fut, node left [futId=" + req.futureId() + + ", node=" + req.nodeId() + ']'); } + + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureId(), + req.partition(), + true, + cctx.deploymentEnabled()); + + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + + "before response is received: " + req.nodeId()); + + e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); + + res.addFailedKeys(req.keys(), e); + + return res; + } + + /** + * @param req Request. + * @param e Error. + */ + final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureId(), + req.partition(), + e instanceof ClusterTopologyCheckedException, + cctx.deploymentEnabled()); + + res.addFailedKeys(req.keys(), e); + + onPrimaryResponse(req.nodeId(), res, true); + } + + /** + * @param req Request. + * @param e Error. + */ + private void onSendError(GridNearAtomicCheckUpdateRequest req, IgniteCheckedException e) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.updateRequest().nodeId(), + req.futureId(), + req.partition(), + e instanceof ClusterTopologyCheckedException, + cctx.deploymentEnabled()); + + res.addFailedKeys(req.updateRequest().keys(), e); + + onPrimaryResponse(req.updateRequest().nodeId(), res, true); } /** - * Adds future prevents topology change before operation complete. - * Should be invoked before topology lock released. * - * @param topVer Topology version. - * @return Future version in case future added. */ - protected final GridCacheVersion addAtomicFuture(AffinityTopologyVersion topVer) { - GridCacheVersion futVer = cctx.versions().next(topVer); + static class PrimaryRequestState { + /** */ + final GridNearAtomicAbstractUpdateRequest req; + + /** */ + @GridToStringInclude + Set<UUID> dhtNodes; + + /** */ + @GridToStringInclude + private Set<UUID> rcvd; + + /** */ + private boolean hasRes; + + /** + * @param req Request. + * @param nodes Affinity nodes. + * @param single {@code True} if created for sigle-key operation. + */ + PrimaryRequestState(GridNearAtomicAbstractUpdateRequest req, List<ClusterNode> nodes, boolean single) { + assert req != null && req.nodeId() != null : req; + + this.req = req; + + if (req.initMappingLocally()) { + if (single) { + if (nodes.size() > 1) { + dhtNodes = U.newHashSet(nodes.size() - 1); + + for (int i = 1; i < nodes.size(); i++) + dhtNodes.add(nodes.get(i).id()); + } + else + dhtNodes = Collections.emptySet(); + } + else { + dhtNodes = new HashSet<>(); + + for (int i = 1; i < nodes.size(); i++) + dhtNodes.add(nodes.get(i).id()); + } + } + } + + /** + * @return Primary node ID. + */ + UUID primaryId() { + return req.nodeId(); + } + + /** + * @param nodes Nodes. + */ + void addMapping(List<ClusterNode> nodes) { + assert req.initMappingLocally(); + + for (int i = 1; i < nodes.size(); i++) + dhtNodes.add(nodes.get(i).id()); + } + + /** + * @param cctx Context. + * @return Check result. + */ + DhtLeftResult checkDhtNodes(GridCacheContext cctx) { + assert req.initMappingLocally() : req; - synchronized (mux) { - assert this.futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; + if (finished()) + return DhtLeftResult.NOT_DONE; - this.topVer = topVer; - this.futVer = futVer; + boolean finished = false; + + for (Iterator<UUID> it = dhtNodes.iterator(); it.hasNext();) { + UUID nodeId = it.next(); + + if (!cctx.discovery().alive(nodeId)) { + it.remove(); + + if (finished()) { + finished = true; + + break; + } + } + } + + if (finished) + return DhtLeftResult.DONE; + + if (dhtNodes.isEmpty()) + return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE; + + return DhtLeftResult.NOT_DONE; + } + + /** + * @return {@code True} if all expected responses are received. + */ + private boolean finished() { + if (req.writeSynchronizationMode() == PRIMARY_SYNC) + return hasRes; + + return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes; + } + + /** + * @return Request if need process primary fail response, {@code null} otherwise. + */ + @Nullable GridNearAtomicAbstractUpdateRequest onPrimaryFail() { + if (finished()) + return null; + + /* + * When primary failed, even if primary response is received, it is possible it failed to send + * request to backup(s), need remap operation. + */ + if (req.fullSync() && !req.nodeFailedResponse()) { + req.resetResponse(); + + return req; + } + + return req.response() == null ? req : null; } - if (storeFuture() && !cctx.mvcc().addAtomicFuture(futVer, this)) - return null; + /** + * @param nodeId Node ID. + * @param res Response. + * @return Request if need process primary response, {@code null} otherwise. + */ + @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { + assert req.nodeId().equals(nodeId); - return futVer; + if (res.nodeLeftResponse()) + return onPrimaryFail(); + + if (finished()) + return null; + + return req.response() == null ? req : null; + } + + /** + * @param nodeId Node ID. + * @return {@code True} if request processing finished. + */ + DhtLeftResult onDhtNodeLeft(UUID nodeId) { + if (req.writeSynchronizationMode() != FULL_SYNC || dhtNodes == null || finished()) + return DhtLeftResult.NOT_DONE; + + if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) { + if (hasRes) + return DhtLeftResult.DONE; + else + return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE; + } + + return DhtLeftResult.NOT_DONE; + } + + /** + * @param nodeId Node ID. + * @param res Response. + * @return {@code True} if request processing finished. + */ + boolean onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) { + assert req.writeSynchronizationMode() == FULL_SYNC : req; + + if (finished()) + return false; + + if (res.hasResult()) + hasRes = true; + + if (dhtNodes == null) { + if (rcvd == null) + rcvd = new HashSet<>(); + + rcvd.add(nodeId); + + return false; + } + + return dhtNodes.remove(nodeId) && finished(); + } + + /** + * @param res Response. + * @param cctx Cache context. + * @return {@code True} if request processing finished. + */ + boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) { + assert !finished() : this; + + hasRes = true; + + boolean onRes = req.onResponse(res); + + assert onRes; + + if (res.error() != null || res.remapTopologyVersion() != null) { + dhtNodes = Collections.emptySet(); // Mark as finished. + + return true; + } + + assert res.returnValue() != null : res; + + if (res.dhtNodes() != null) + initDhtNodes(res.dhtNodes(), cctx); + + return finished(); + } + + /** + * @param nodeIds Node IDs. + * @param cctx Context. + */ + private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) { + assert dhtNodes == null || req.initMappingLocally(); + + Set<UUID> dhtNodes0 = dhtNodes; + + dhtNodes = null; + + for (UUID dhtNodeId : nodeIds) { + if (F.contains(rcvd, dhtNodeId)) + continue; + + if (req.initMappingLocally() && !F.contains(dhtNodes0, dhtNodeId)) + continue; + + if (cctx.discovery().node(dhtNodeId) != null) { + if (dhtNodes == null) + dhtNodes = U.newHashSet(nodeIds.size()); + + dhtNodes.add(dhtNodeId); + } + } + + if (dhtNodes == null) + dhtNodes = Collections.emptySet(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PrimaryRequestState.class, this, + "primary", primaryId(), + "needPrimaryRes", req.needPrimaryResponse(), + "primaryRes", req.response() != null, + "done", finished()); + } + } + + /** + * + */ + enum DhtLeftResult { + /** All responses and operation result are received. */ + DONE, + + /** Not all responses are received. */ + NOT_DONE, + + /** + * All backups failed and response from primary is not required, + * in this case in FULL_SYNC mode need send additional request + * on primary to ensure FULL_SYNC guarantee. + */ + ALL_RCVD_CHECK_PRIMARY + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearAtomicAbstractUpdateFuture.class, this, super.toString()); } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index bee2ecd..a43bfb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -17,18 +17,28 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; +import java.nio.ByteBuffer; import java.util.List; import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -38,106 +48,331 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); + /** . */ + private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01; + + /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ + private static final int TOP_LOCKED_FLAG_MASK = 0x02; + + /** Skip write-through to a persistent storage. */ + private static final int SKIP_STORE_FLAG_MASK = 0x04; + + /** Keep binary flag. */ + private static final int KEEP_BINARY_FLAG_MASK = 0x08; + + /** Return value flag. */ + private static final int RET_VAL_FLAG_MASK = 0x10; + + /** Target node ID. */ + @GridDirectTransient + protected UUID nodeId; + + /** Future version. */ + protected long futId; + + /** Topology version. */ + protected AffinityTopologyVersion topVer; + + /** Write synchronization mode. */ + protected CacheWriteSynchronizationMode syncMode; + + /** Update operation. */ + protected GridCacheOperation op; + + /** Subject ID. */ + protected UUID subjId; + + /** Task name hash. */ + protected int taskNameHash; + + /** Compressed boolean flags. Make sure 'toString' is updated when add new flag. */ + @GridToStringExclude + protected byte flags; + + /** */ + @GridDirectTransient + private GridNearAtomicUpdateResponse res; + /** - * @return Mapped node ID. + * */ - public abstract UUID nodeId(); + public GridNearAtomicAbstractUpdateRequest() { + // No-op. + } /** + * Constructor. + * + * @param cacheId Cache ID. * @param nodeId Node ID. + * @param futId Future ID. + * @param topVer Topology version. + * @param topLocked Topology locked flag. + * @param syncMode Synchronization mode. + * @param op Cache update operation. + * @param retval Return value required flag. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param needPrimaryRes {@code True} if near node waits for primary response. + * @param skipStore Skip write-through to a persistent storage. + * @param keepBinary Keep binary flag. + * @param addDepInfo Deployment info flag. + */ + protected GridNearAtomicAbstractUpdateRequest( + int cacheId, + UUID nodeId, + long futId, + @NotNull AffinityTopologyVersion topVer, + boolean topLocked, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + boolean retval, + @Nullable UUID subjId, + int taskNameHash, + boolean needPrimaryRes, + boolean skipStore, + boolean keepBinary, + boolean addDepInfo + ) { + this.cacheId = cacheId; + this.nodeId = nodeId; + this.futId = futId; + this.topVer = topVer; + this.syncMode = syncMode; + this.op = op; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.addDepInfo = addDepInfo; + + if (needPrimaryRes) + needPrimaryResponse(true); + if (topLocked) + topologyLocked(true); + if (retval) + returnValue(true); + if (skipStore) + skipStore(true); + if (keepBinary) + keepBinary(true); + } + + /** {@inheritDoc} */ + @Override public final AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public final int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** {@inheritDoc} */ + @Override public final boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** + * @return {@code True} if near node is able to initialize update mapping locally. + */ + boolean initMappingLocally() { + return !needPrimaryResponse() && fullSync(); + } + + /** + * @return {@code True} if near node waits for primary response. + */ + boolean needPrimaryResponse() { + return isFlag(NEED_PRIMARY_RES_FLAG_MASK); + } + + /** + * @param needRes {@code True} if near node waits for primary response. + */ + void needPrimaryResponse(boolean needRes) { + setFlag(needRes, NEED_PRIMARY_RES_FLAG_MASK); + } + + /** + * @return {@code True} if update is processed in {@link CacheWriteSynchronizationMode#FULL_SYNC} mode. + */ + boolean fullSync() { + assert syncMode != null; + + return syncMode == CacheWriteSynchronizationMode.FULL_SYNC; + } + + /** + * @return Task name hash code. + */ + public int taskNameHash() { + return taskNameHash; + } + + /** + * @return Update opreation. */ - public abstract void nodeId(UUID nodeId); + public GridCacheOperation operation() { + return op; + } /** * @return Subject ID. */ - public abstract UUID subjectId(); + public UUID subjectId() { + return subjId; + } /** - * @return Task name hash. + * @return Target node ID. */ - public abstract int taskNameHash(); + public UUID nodeId() { + return nodeId; + } /** - * @return Future version. + * @return Near node future ID. */ - public abstract GridCacheVersion futureVersion(); + public long futureId() { + return futId; + } /** - * @return Flag indicating whether this is fast-map udpate. + * @return Write synchronization mode. */ - public abstract boolean fastMap(); + public final CacheWriteSynchronizationMode writeSynchronizationMode() { + return syncMode; + } /** - * @return Update version for fast-map request. + * @param res Response. + * @return {@code True} if current response was {@code null}. */ - public abstract GridCacheVersion updateVersion(); + public boolean onResponse(GridNearAtomicUpdateResponse res) { + if (this.res == null) { + this.res = res; + + return true; + } + + return false; + } /** - * @return Topology locked flag. + * */ - public abstract boolean topologyLocked(); + void resetResponse() { + this.res = null; + } /** - * @return {@code True} if request sent from client node. + * @return Response. */ - public abstract boolean clientRequest(); + @Nullable public GridNearAtomicUpdateResponse response() { + return res; + } /** - * @return Cache write synchronization mode. + * @return {@code True} if received notification about primary fail. */ - public abstract CacheWriteSynchronizationMode writeSynchronizationMode(); + boolean nodeFailedResponse() { + return res != null && res.nodeLeftResponse(); + } /** - * @return Expiry policy. + * @return Topology locked flag. */ - public abstract ExpiryPolicy expiry(); + final boolean topologyLocked() { + return isFlag(TOP_LOCKED_FLAG_MASK); + } + + /** + * @param val {@code True} if topology is locked on near node. + */ + private void topologyLocked(boolean val) { + setFlag(val, TOP_LOCKED_FLAG_MASK); + } /** * @return Return value flag. */ - public abstract boolean returnValue(); + public final boolean returnValue() { + return isFlag(RET_VAL_FLAG_MASK); + } /** - * @return Filter. + * @param val Return value flag. */ - @Nullable public abstract CacheEntryPredicate[] filter(); + public final void returnValue(boolean val) { + setFlag(val, RET_VAL_FLAG_MASK); + } /** * @return Skip write-through to a persistent storage. */ - public abstract boolean skipStore(); + public final boolean skipStore() { + return isFlag(SKIP_STORE_FLAG_MASK); + } + + /** + * @param val Skip store flag. + */ + public void skipStore(boolean val) { + setFlag(val, SKIP_STORE_FLAG_MASK); + } /** * @return Keep binary flag. */ - public abstract boolean keepBinary(); + public final boolean keepBinary() { + return isFlag(KEEP_BINARY_FLAG_MASK); + } /** - * @return Update operation. + * @param val Keep binary flag. */ - public abstract GridCacheOperation operation(); + public void keepBinary(boolean val) { + setFlag(val, KEEP_BINARY_FLAG_MASK); + } /** - * @return Optional arguments for entry processor. + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. */ - @Nullable public abstract Object[] invokeArguments(); + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } /** - * @return Flag indicating whether this request contains primary keys. + * Reads flag mask. + * + * @param mask Mask to read. + * @return Flag value. */ - public abstract boolean hasPrimary(); + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } /** - * @param res Response. - * @return {@code True} if current response was {@code null}. + * @return Expiry policy. */ - public abstract boolean onResponse(GridNearAtomicUpdateResponse res); + public abstract ExpiryPolicy expiry(); /** - * @return Response. + * @return Filter. */ - @Nullable public abstract GridNearAtomicUpdateResponse response(); + @Nullable public abstract CacheEntryPredicate[] filter(); + + /** + * @return Optional arguments for entry processor. + */ + @Nullable public abstract Object[] invokeArguments(); /** * @param key Key to add. @@ -145,14 +380,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa * @param conflictTtl Conflict TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). - * @param primary If given key is primary on this mapping. */ - public abstract void addUpdateEntry(KeyCacheObject key, + abstract void addUpdateEntry(KeyCacheObject key, @Nullable Object val, long conflictTtl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean primary); + @Nullable GridCacheVersion conflictVer); /** * @return Keys for this update request. @@ -182,7 +415,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa */ public abstract CacheObject writeValue(int idx); - /** * @return Conflict versions. */ @@ -223,4 +455,170 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa * @return Key. */ public abstract KeyCacheObject key(int idx); + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 10; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeUuid("subjId", subjId)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + byte opOrd; + + opOrd = reader.readByte("op"); + + if (!reader.isLastRead()) + return false; + + op = GridCacheOperation.fromOrdinal(opOrd); + + reader.incrementState(); + + case 6: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + byte syncModeOrd; + + syncModeOrd = reader.readByte("syncMode"); + + if (!reader.isLastRead()) + return false; + + syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); + + reader.incrementState(); + + case 8: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class); + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder flags = new StringBuilder(); + + if (needPrimaryResponse()) + appendFlag(flags, "needRes"); + if (topologyLocked()) + appendFlag(flags, "topLock"); + if (skipStore()) + appendFlag(flags, "skipStore"); + if (keepBinary()) + appendFlag(flags, "keepBinary"); + if (returnValue()) + appendFlag(flags, "retVal"); + + return S.toString(GridNearAtomicAbstractUpdateRequest.class, this, + "flags", flags.toString()); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java new file mode 100644 index 0000000..4d0726a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache message index. */ + public static final int CACHE_MSG_IDX = nextIndexId(); + + /** */ + @GridDirectTransient + private GridNearAtomicAbstractUpdateRequest updateReq; + + /** */ + private int partId; + + /** */ + private long futId; + + /** + * + */ + public GridNearAtomicCheckUpdateRequest() { + // No-op. + } + + /** + * @param updateReq Related update request. + */ + GridNearAtomicCheckUpdateRequest(GridNearAtomicAbstractUpdateRequest updateReq) { + assert updateReq != null && updateReq.fullSync() : updateReq; + + this.updateReq = updateReq; + this.cacheId = updateReq.cacheId(); + this.partId = updateReq.partition(); + this.futId = updateReq.futureId(); + + assert partId >= 0; + } + + /** + * @return Future ID on near node. + */ + public final long futureId() { + return futId; + } + + /** + * @return Related update request. + */ + GridNearAtomicAbstractUpdateRequest updateRequest() { + return updateReq; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return partId; + } + + /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -47; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeInt("partId", partId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + partId = reader.readInt("partId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridNearAtomicCheckUpdateRequest.class); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearAtomicCheckUpdateRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index 08c2474..c381333 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -26,7 +26,6 @@ import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; @@ -41,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -61,56 +61,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat /** */ private static final long serialVersionUID = 0L; - /** Target node ID. */ - @GridDirectTransient - private UUID nodeId; - - /** Future version. */ - private GridCacheVersion futVer; - - /** Update version. Set to non-null if fastMap is {@code true}. */ - private GridCacheVersion updateVer; - - /** Topology version. */ - private AffinityTopologyVersion topVer; - - /** Write synchronization mode. */ - private CacheWriteSynchronizationMode syncMode; - - /** Update operation. */ - private GridCacheOperation op; - - /** Subject ID. */ - protected UUID subjId; - - /** Task name hash. */ - protected int taskNameHash; - - /** */ - @GridDirectTransient - private GridNearAtomicUpdateResponse res; - - /** Fast map flag. */ - protected boolean fastMap; - - /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ - protected boolean topLocked; - - /** Flag indicating whether request contains primary keys. */ - protected boolean hasPrimary; - - /** Skip write-through to a persistent storage. */ - protected boolean skipStore; - - /** */ - protected boolean clientReq; - - /** Keep binary flag. */ - protected boolean keepBinary; - - /** Return value flag. */ - protected boolean retval; - /** Keys to update. */ @GridToStringInclude @GridDirectCollection(KeyCacheObject.class) @@ -120,10 +70,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat @GridDirectCollection(CacheObject.class) private List<CacheObject> vals; - /** Partitions of keys. */ - @GridDirectCollection(int.class) - private List<Integer> partIds; - /** Entry processors. */ @GridDirectTransient private List<EntryProcessor<Object, Object, Object>> entryProcessors; @@ -175,9 +121,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat * * @param cacheId Cache ID. * @param nodeId Node ID. - * @param futVer Future version. - * @param fastMap Fast map scheme flag. - * @param updateVer Update version set if fast map is performed. + * @param futId Future ID. * @param topVer Topology version. * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. @@ -190,16 +134,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. * @param keepBinary Keep binary flag. - * @param clientReq Client node request flag. * @param addDepInfo Deployment info flag. * @param maxEntryCnt Maximum entries count. */ GridNearAtomicFullUpdateRequest( int cacheId, UUID nodeId, - GridCacheVersion futVer, - boolean fastMap, - @Nullable GridCacheVersion updateVer, + long futId, @NotNull AffinityTopologyVersion topVer, boolean topLocked, CacheWriteSynchronizationMode syncMode, @@ -210,34 +151,29 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash, + boolean needPrimaryRes, boolean skipStore, boolean keepBinary, - boolean clientReq, boolean addDepInfo, int maxEntryCnt ) { - assert futVer != null; - - this.cacheId = cacheId; - this.nodeId = nodeId; - this.futVer = futVer; - this.fastMap = fastMap; - this.updateVer = updateVer; - - this.topVer = topVer; - this.topLocked = topLocked; - this.syncMode = syncMode; - this.op = op; - this.retval = retval; + super(cacheId, + nodeId, + futId, + topVer, + topLocked, + syncMode, + op, + retval, + subjId, + taskNameHash, + needPrimaryRes, + skipStore, + keepBinary, + addDepInfo); this.expiryPlc = expiryPlc; this.invokeArgs = invokeArgs; this.filter = filter; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.skipStore = skipStore; - this.keepBinary = keepBinary; - this.clientReq = clientReq; - this.addDepInfo = addDepInfo; // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys @@ -246,84 +182,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat initSize = Math.min(maxEntryCnt, 10); keys = new ArrayList<>(initSize); - - partIds = new ArrayList<>(initSize); - } - - /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } - - /** {@inheritDoc} */ - @Override public UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } - - /** {@inheritDoc} */ - @Override public UUID subjectId() { - return subjId; - } - - /** {@inheritDoc} */ - @Override public int taskNameHash() { - return taskNameHash; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion futureVersion() { - return futVer; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion updateVersion() { - return updateVer; - } - - /** {@inheritDoc} */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** {@inheritDoc} */ - @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { - return syncMode; - } - - /** {@inheritDoc} */ - @Override public GridCacheOperation operation() { - return op; - } - - /** {@inheritDoc} */ - @Override public boolean onResponse(GridNearAtomicUpdateResponse res) { - if (this.res == null) { - this.res = res; - - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override @Nullable public GridNearAtomicUpdateResponse response() { - return res; - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return addDepInfo; - } - - /** {@inheritDoc} */ - @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { - return ctx.atomicMessageLogger(); } /** {@inheritDoc} */ @@ -331,8 +189,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat @Nullable Object val, long conflictTtl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean primary) { + @Nullable GridCacheVersion conflictVer) { EntryProcessor<Object, Object, Object> entryProcessor = null; if (op == TRANSFORM) { @@ -344,7 +201,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat assert val != null || op == DELETE; keys.add(key); - partIds.add(key.partition()); if (entryProcessor != null) { if (entryProcessors == null) @@ -361,8 +217,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat vals.add((CacheObject)val); } - hasPrimary |= primary; - // In case there is no conflict, do not create the list. if (conflictVer != null) { if (conflictVers == null) { @@ -407,6 +261,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat /** {@inheritDoc} */ @Override public int size() { + assert keys != null; + return keys != null ? keys.size() : 0; } @@ -488,41 +344,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat } /** {@inheritDoc} */ - @Override public boolean fastMap() { - return fastMap; - } - - /** {@inheritDoc} */ - @Override public boolean topologyLocked() { - return topLocked; - } - - /** {@inheritDoc} */ - @Override public boolean clientRequest() { - return clientReq; - } - - /** {@inheritDoc} */ - @Override public boolean returnValue() { - return retval; - } - - /** {@inheritDoc} */ - @Override public boolean skipStore() { - return skipStore; - } - - /** {@inheritDoc} */ - @Override public boolean keepBinary() { - return keepBinary; - } - - /** {@inheritDoc} */ - @Override public boolean hasPrimary() { - return hasPrimary; - } - - /** {@inheritDoc} */ @Override @Nullable public CacheEntryPredicate[] filter() { return filter; } @@ -600,18 +421,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat } else finishUnmarshalCacheObjects(vals, cctx, ldr); - - if (partIds != null && !partIds.isEmpty()) { - assert partIds.size() == keys.size(); - - for (int i = 0; i < keys.size(); i++) - keys.get(i).partition(partIds.get(i)); - } } /** {@inheritDoc} */ @Override public int partition() { - return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + assert !F.isEmpty(keys); + + return keys.get(0).partition(); } /** {@inheritDoc} */ @@ -629,145 +445,55 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat } switch (writer.state()) { - case 3: - if (!writer.writeBoolean("clientReq", clientReq)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeMessage("conflictTtls", conflictTtls)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeBoolean("fastMap", fastMap)) - return false; - - writer.incrementState(); - case 10: - if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) return false; writer.incrementState(); case 11: - if (!writer.writeMessage("futVer", futVer)) + if (!writer.writeMessage("conflictTtls", conflictTtls)) return false; writer.incrementState(); case 12: - if (!writer.writeBoolean("hasPrimary", hasPrimary)) + if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 13: - if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 14: - if (!writer.writeBoolean("keepBinary", keepBinary)) + if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) return false; writer.incrementState(); case 15: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 16: - if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 17: - if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 18: - if (!writer.writeBoolean("retval", retval)) - return false; - - writer.incrementState(); - - case 19: - if (!writer.writeBoolean("skipStore", skipStore)) - return false; - - writer.incrementState(); - - case 20: - if (!writer.writeUuid("subjId", subjId)) - return false; - - writer.incrementState(); - - case 21: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 22: - if (!writer.writeInt("taskNameHash", taskNameHash)) - return false; - - writer.incrementState(); - - case 23: - if (!writer.writeBoolean("topLocked", topLocked)) - return false; - - writer.incrementState(); - - case 24: - if (!writer.writeMessage("topVer", topVer)) - return false; - - writer.incrementState(); - - case 25: - if (!writer.writeMessage("updateVer", updateVer)) - return false; - - writer.incrementState(); - - case 26: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -789,64 +515,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat return false; switch (reader.state()) { - case 3: - clientReq = reader.readBoolean("clientReq"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - conflictExpireTimes = reader.readMessage("conflictExpireTimes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - conflictTtls = reader.readMessage("conflictTtls"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - fastMap = reader.readBoolean("fastMap"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - case 10: - filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); + conflictExpireTimes = reader.readMessage("conflictExpireTimes"); if (!reader.isLastRead()) return false; @@ -854,7 +524,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat reader.incrementState(); case 11: - futVer = reader.readMessage("futVer"); + conflictTtls = reader.readMessage("conflictTtls"); if (!reader.isLastRead()) return false; @@ -862,7 +532,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat reader.incrementState(); case 12: - hasPrimary = reader.readBoolean("hasPrimary"); + conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -870,7 +540,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat reader.incrementState(); case 13: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); + entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) return false; @@ -878,7 +548,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat reader.incrementState(); case 14: - keepBinary = reader.readBoolean("keepBinary"); + expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); if (!reader.isLastRead()) return false; @@ -886,7 +556,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat reader.incrementState(); case 15: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); if (!reader.isLastRead()) return false; @@ -894,19 +564,15 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat reader.incrementState(); case 16: - byte opOrd; - - opOrd = reader.readByte("op"); + invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) return false; - op = GridCacheOperation.fromOrdinal(opOrd); - reader.incrementState(); case 17: - partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -914,74 +580,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat reader.incrementState(); case 18: - retval = reader.readBoolean("retval"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 19: - skipStore = reader.readBoolean("skipStore"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 20: - subjId = reader.readUuid("subjId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 21: - byte syncModeOrd; - - syncModeOrd = reader.readByte("syncMode"); - - if (!reader.isLastRead()) - return false; - - syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); - - reader.incrementState(); - - case 22: - taskNameHash = reader.readInt("taskNameHash"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 23: - topLocked = reader.readBoolean("topLocked"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 24: - topVer = reader.readMessage("topVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 25: - updateVer = reader.readMessage("updateVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 26: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -1013,12 +611,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 27; + return 19; } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridNearAtomicFullUpdateRequest.class, this, "filter", Arrays.toString(filter), + return S.toString(GridNearAtomicFullUpdateRequest.class, this, + "filter", Arrays.toString(filter), "parent", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java index e0c24b2..78582b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java @@ -58,9 +58,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl * * @param cacheId Cache ID. * @param nodeId Node ID. - * @param futVer Future version. - * @param fastMap Fast map scheme flag. - * @param updateVer Update version set if fast map is performed. + * @param futId Future ID. * @param topVer Topology version. * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. @@ -71,15 +69,12 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. * @param keepBinary Keep binary flag. - * @param clientReq Client node request flag. * @param addDepInfo Deployment info flag. */ GridNearAtomicSingleUpdateFilterRequest( int cacheId, UUID nodeId, - GridCacheVersion futVer, - boolean fastMap, - @Nullable GridCacheVersion updateVer, + long futId, @NotNull AffinityTopologyVersion topVer, boolean topLocked, CacheWriteSynchronizationMode syncMode, @@ -88,17 +83,15 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash, + boolean needPrimaryRes, boolean skipStore, boolean keepBinary, - boolean clientReq, boolean addDepInfo ) { super( cacheId, nodeId, - futVer, - fastMap, - updateVer, + futId, topVer, topLocked, syncMode, @@ -106,9 +99,9 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl retval, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, - clientReq, addDepInfo ); @@ -173,7 +166,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl } switch (writer.state()) { - case 14: + case 12: if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) return false; @@ -195,7 +188,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl return false; switch (reader.state()) { - case 14: + case 12: filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); if (!reader.isLastRead()) @@ -215,7 +208,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; + return 13; } /** {@inheritDoc} */
