Implemented single update request.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c560110 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c560110 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c560110 Branch: refs/heads/ignite-2523-1 Commit: 0c5601102a836177351e9428acdd3f0896fd0198 Parents: 5646ef0 Author: vozerov-gridgain <[email protected]> Authored: Tue Apr 19 14:54:12 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Apr 19 14:54:12 2016 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 6 + .../processors/cache/GridCacheIoManager.java | 17 + .../dht/atomic/GridDhtAtomicCache.java | 48 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 13 +- .../GridNearAtomicAbstractUpdateRequest.java | 239 +++++ .../GridNearAtomicSingleUpdateFuture.java | 130 ++- .../GridNearAtomicSingleUpdateRequest.java | 945 +++++++++++++++++++ .../dht/atomic/GridNearAtomicUpdateRequest.java | 217 ++--- .../distributed/near/GridNearAtomicCache.java | 8 +- 9 files changed, 1407 insertions(+), 216 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 47b1c5f..e028cf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlock import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; @@ -726,6 +727,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case -23: + msg = new GridNearAtomicSingleUpdateRequest(); + + break; + // [-3..119] [124] - this // [120..123] - DR // [-4..-22] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index aab1bcc..82b7604 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; @@ -603,6 +604,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; + case -23: { + GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg; + + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + ctx.cacheId(), + nodeId, + req.futureVersion(), + ctx.deploymentEnabled()); + + res.error(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + } + + break; + default: throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]", msg.classError()); http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 013184b..cbda827 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -140,7 +140,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** Update reply closure. */ @GridToStringExclude - private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos; + private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos; /** Pending */ private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>(); @@ -193,9 +193,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { assert req.writeSynchronizationMode() != FULL_ASYNC : req; @@ -257,6 +257,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); + ctx.io().addHandler(ctx.cacheId(), GridNearAtomicSingleUpdateRequest.class, new CI2<UUID, GridNearAtomicSingleUpdateRequest>() { + @Override public void apply(UUID nodeId, GridNearAtomicSingleUpdateRequest req) { + processNearAtomicUpdateRequest(nodeId, req); + } + }); + ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) { processNearAtomicUpdateResponse(nodeId, res); @@ -1447,8 +1453,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ public void updateAllAsyncInternal( final UUID nodeId, - final GridNearAtomicUpdateRequest req, - final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb + final GridNearAtomicAbstractUpdateRequest req, + final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb ) { IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); @@ -1472,8 +1478,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ public void updateAllAsyncInternal0( UUID nodeId, - GridNearAtomicUpdateRequest req, - CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb + GridNearAtomicAbstractUpdateRequest req, + CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), ctx.deploymentEnabled()); @@ -1698,12 +1704,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private UpdateBatchResult updateWithBatch( final ClusterNode node, final boolean hasNear, - final GridNearAtomicUpdateRequest req, + final GridNearAtomicAbstractUpdateRequest req, final GridNearAtomicUpdateResponse res, final List<GridDhtCacheEntry> locked, final GridCacheVersion ver, @Nullable GridDhtAtomicUpdateFuture dhtFut, - final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, final boolean replicate, final String taskName, @Nullable final IgniteCacheExpiryPolicy expiry, @@ -1712,7 +1718,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. - if (!F.isEmpty(req.filter()) && ctx.loadPreviousValue()) { + if (req.hasFilter() && ctx.loadPreviousValue()) { try { reloadIfNeeded(locked); } @@ -1723,7 +1729,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - int size = req.keys().size(); + int size = req.keysCount(); Map<KeyCacheObject, CacheObject> putMap = null; @@ -2117,12 +2123,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private UpdateSingleResult updateSingle( ClusterNode node, boolean hasNear, - GridNearAtomicUpdateRequest req, + GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res, List<GridDhtCacheEntry> locked, GridCacheVersion ver, @Nullable GridDhtAtomicUpdateFuture dhtFut, - CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, @@ -2379,8 +2385,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final Collection<KeyCacheObject> rmvKeys, @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, @Nullable GridDhtAtomicUpdateFuture dhtFut, - final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, - final GridNearAtomicUpdateRequest req, + final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + final GridNearAtomicAbstractUpdateRequest req, final GridNearAtomicUpdateResponse res, final boolean replicate, final UpdateBatchResult batchRes, @@ -2784,7 +2790,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * will return false. * @return {@code True} if filter evaluation succeeded. */ - private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req, + private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { try { return ctx.isAllLocked(entry, req.filter()); @@ -2799,7 +2805,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @param req Request to remap. */ - private void remapToNewPrimary(GridNearAtomicUpdateRequest req) { + private void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) { assert req.writeSynchronizationMode() == FULL_ASYNC : req; if (log.isDebugEnabled()) @@ -2816,7 +2822,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { drRmvVals = null; } else if (req.operation() == UPDATE) { - int size = req.keys().size(); + int size = req.keysCount(); drPutVals = new ArrayList<>(size); @@ -2878,9 +2884,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @Nullable private GridDhtAtomicUpdateFuture createDhtFuture( GridCacheVersion writeVer, - GridNearAtomicUpdateRequest updateReq, + GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes, - CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean force ) { if (!force) { @@ -2911,7 +2917,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nodeId Sender node ID. * @param req Near atomic update request. */ - private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) { + private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (log.isDebugEnabled()) log.debug("Processing near atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4721d6e..5dbf6c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -79,7 +79,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Completion callback. */ @GridToStringExclude - private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb; + private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb; /** Mappings. */ @GridToStringInclude @@ -89,7 +89,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries; /** Update request. */ - private final GridNearAtomicUpdateRequest updateReq; + private final GridNearAtomicAbstractUpdateRequest updateReq; /** Update response. */ private final GridNearAtomicUpdateResponse updateRes; @@ -115,10 +115,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> */ public GridDhtAtomicUpdateFuture( GridCacheContext cctx, - CI2<GridNearAtomicUpdateRequest, - GridNearAtomicUpdateResponse> completionCb, + CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, GridCacheVersion writeVer, - GridNearAtomicUpdateRequest updateReq, + GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes ) { this.cctx = cctx; @@ -132,8 +131,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); - keys = new ArrayList<>(updateReq.keys().size()); - mappings = U.newHashMap(updateReq.keys().size()); + keys = new ArrayList<>(updateReq.keysCount()); + mappings = U.newHashMap(updateReq.keysCount()); waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java new file mode 100644 index 0000000..7bceb9b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheDeployable; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.jetbrains.annotations.Nullable; + +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; +import java.util.List; +import java.util.UUID; + +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; + +/** + * Abstract near update request. + */ +public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable { + /** */ + private static final long serialVersionUID = 0L; + + /** Message index. */ + public static final int CACHE_MSG_IDX = nextIndexId(); + + /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** + * @return Mapped node ID. + */ + public abstract UUID nodeId(); + + /** + * @param nodeId Node ID. + */ + public abstract void nodeId(UUID nodeId); + + /** + * @return Subject ID. + */ + public abstract UUID subjectId(); + + /** + * @return Task name hash. + */ + public abstract int taskNameHash(); + + /** + * @return Future version. + */ + public abstract GridCacheVersion futureVersion(); + + /** + * @return Flag indicating whether this is fast-map udpate. + */ + public abstract boolean fastMap(); + + /** + * @return Update version for fast-map request. + */ + public abstract GridCacheVersion updateVersion(); + + /** + * @return Topology locked flag. + */ + public abstract boolean topologyLocked(); + + /** + * @return {@code True} if request sent from client node. + */ + public abstract boolean clientRequest(); + + /** + * @return Cache write synchronization mode. + */ + public abstract CacheWriteSynchronizationMode writeSynchronizationMode(); + + /** + * @return Expiry policy. + */ + public abstract ExpiryPolicy expiry(); + + /** + * @return Return value flag. + */ + public abstract boolean returnValue(); + + /** + * @return Filter. + */ + // TODO + @Nullable public abstract CacheEntryPredicate[] filter(); + + /** + * Whether filter exist. + * + * @return {@code True} if exist. + */ + public abstract boolean hasFilter(); + + /** + * @return Skip write-through to a persistent storage. + */ + public abstract boolean skipStore(); + + /** + * @return Keep binary flag. + */ + public abstract boolean keepBinary(); + + /** + * @return Keys for this update request. + */ + // TODO + public abstract List<KeyCacheObject> keys(); + + /** + * @return Key number. + */ + public abstract int keysCount(); + + /** + * @return Values for this update request. + */ + // TODO + public abstract List<?> values(); + + /** + * @return Update operation. + */ + public abstract GridCacheOperation operation(); + + /** + * @return Optional arguments for entry processor. + */ + @Nullable public abstract Object[] invokeArguments(); + + /** + * @param idx Key index. + * @return Value. + */ + // TODO + @SuppressWarnings("unchecked") + public abstract CacheObject value(int idx); + + /** + * @param idx Key index. + * @return Entry processor. + */ + @SuppressWarnings("unchecked") + // TODO + public abstract EntryProcessor<Object, Object, Object> entryProcessor(int idx); + + /** + * @param idx Index to get. + * @return Write value - either value, or transform closure. + */ + // TODO + public abstract CacheObject writeValue(int idx); + + /** + * @return Conflict versions. + */ + // TODO + @Nullable public abstract List<GridCacheVersion> conflictVersions(); + + /** + * @param idx Index. + * @return Conflict version. + */ + // TODO + @Nullable public abstract GridCacheVersion conflictVersion(int idx); + + /** + * @param idx Index. + * @return Conflict TTL. + */ + // TODO + public abstract long conflictTtl(int idx); + + /** + * @param idx Index. + * @return Conflict expire time. + */ + // TODO + public abstract long conflictExpireTime(int idx); + + /** + * @return Flag indicating whether this request contains primary keys. + */ + public abstract boolean hasPrimary(); + + /** + * @param res Response. + * @return {@code True} if current response was {@code null}. + */ + // TODO + public abstract boolean onResponse(GridNearAtomicUpdateResponse res); + + /** + * @return Response. + */ + // TODO + @Nullable public abstract GridNearAtomicUpdateResponse response(); + + /** + * Cleanup values. + * + * @param clearKeys If {@code true} clears keys. + */ + public abstract void cleanup(boolean clearKeys); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 7a95640..266ff21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -48,6 +48,7 @@ import javax.cache.expiry.ExpiryPolicy; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -67,7 +68,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda private Object val; /** Not null is operation is mapped to single node. */ - private GridNearAtomicUpdateRequest req; + private GridNearAtomicAbstractUpdateRequest req; /** * @param cctx Cache context. @@ -128,7 +129,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda GridNearAtomicUpdateResponse res = null; synchronized (mux) { - GridNearAtomicUpdateRequest req = this.req.nodeId().equals(nodeId) ? this.req : null; + GridNearAtomicAbstractUpdateRequest req = this.req.nodeId().equals(nodeId) ? this.req : null; if (req != null && req.response() == null) { res = new GridNearAtomicUpdateResponse(cctx.cacheId(), @@ -191,7 +192,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { - GridNearAtomicUpdateRequest req; + GridNearAtomicAbstractUpdateRequest req; AffinityTopologyVersion remapTopVer = null; @@ -367,7 +368,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param req Update request. * @param res Update response. */ - private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { assert nearEnabled; if (res.remapKeys() != null || !req.hasPrimary()) @@ -438,11 +439,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param nodeId Node ID. * @param req Request. */ - private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { + private void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (cctx.localNodeId().equals(nodeId)) { cache.updateAllAsyncInternal(nodeId, req, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { onResult(res.nodeId(), res, false); } }); @@ -467,7 +468,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param req Request. * @param e Error. */ - void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { + void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { synchronized (mux) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), req.nodeId(), @@ -492,7 +493,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } Exception err = null; - GridNearAtomicUpdateRequest singleReq0 = null; + GridNearAtomicAbstractUpdateRequest singleReq0 = null; GridCacheVersion futVer = cctx.versions().next(topVer); @@ -581,7 +582,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @return Request. * @throws Exception If failed. */ - private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, + private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, GridCacheVersion futVer, @Nullable GridCacheVersion updVer) throws Exception { if (key == null) @@ -597,40 +598,89 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (op != TRANSFORM) val = cctx.toCacheObject(val); - ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + List<ClusterNode> affNodes = cctx.affinity().nodes(cacheKey, topVer); - if (primary == null) + if (F.isEmpty(affNodes)) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + "left the grid)."); - GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - primary.id(), - futVer, - false, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - 1); - - req.addUpdateEntry(cacheKey, - val, - CU.TTL_NOT_CHANGED, - CU.EXPIRE_TIME_CALCULATE, - null, - true); + // TODO: Can we change it to affNodes.get(0)? + ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + + assert primary != null; + + boolean single = true; + + // TODO: Instead, we could implement a method on affinity to check if all nodes are greater than particular ver. + for (ClusterNode affNode : affNodes) { + if (!affNode.version().greaterThanEqual(1, 1, 6)) { + single = false; + + break; + } + } + + GridNearAtomicAbstractUpdateRequest req; + + if (single) { + GridNearAtomicSingleUpdateRequest req0 = new GridNearAtomicSingleUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + false, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + 1); + + req0.addUpdateEntry(cacheKey, val, true); + + req = req0; + } + else { + GridNearAtomicUpdateRequest req0 = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + false, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + 1); + + req0.addUpdateEntry(cacheKey, + val, + CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE, + null, + true); + + req = req0; + } return req; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java new file mode 100644 index 0000000..741a4d3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -0,0 +1,945 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; +import java.io.Externalizable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; + +/** + * Atomic near single update request. + */ +public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpdateRequest { + /** */ + private static final long serialVersionUID = 0L; + + /** Target node ID. */ + @GridDirectTransient + private UUID nodeId; + + /** Future version. */ + private GridCacheVersion futVer; + + /** Fast map flag. */ + private boolean fastMap; + + /** Update version. Set to non-null if fastMap is {@code true}. */ + private GridCacheVersion updateVer; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ + private boolean topLocked; + + /** Write synchronization mode. */ + private CacheWriteSynchronizationMode syncMode; + + /** Update operation. */ + private GridCacheOperation op; + + /** Keys to update. */ + @GridToStringInclude + @GridDirectCollection(KeyCacheObject.class) + private List<KeyCacheObject> keys; + + /** Values to update. */ + @GridDirectCollection(CacheObject.class) + private List<CacheObject> vals; + + /** Entry processors. */ + @GridDirectTransient + private List<EntryProcessor<Object, Object, Object>> entryProcessors; + + /** Entry processors bytes. */ + @GridDirectCollection(byte[].class) + private List<byte[]> entryProcessorsBytes; + + /** Optional arguments for entry processor. */ + @GridDirectTransient + private Object[] invokeArgs; + + /** Entry processor arguments bytes. */ + private byte[][] invokeArgsBytes; + + /** Conflict versions. */ + @GridDirectCollection(GridCacheVersion.class) + private List<GridCacheVersion> conflictVers; + + /** Conflict TTLs. */ + private GridLongList conflictTtls; + + /** Conflict expire times. */ + private GridLongList conflictExpireTimes; + + /** Return value flag. */ + private boolean retval; + + /** Expiry policy. */ + @GridDirectTransient + private ExpiryPolicy expiryPlc; + + /** Expiry policy bytes. */ + private byte[] expiryPlcBytes; + + /** Filter. */ + private CacheEntryPredicate[] filter; + + /** Flag indicating whether request contains primary keys. */ + private boolean hasPrimary; + + /** Subject ID. */ + private UUID subjId; + + /** Task name hash. */ + private int taskNameHash; + + /** Skip write-through to a persistent storage. */ + private boolean skipStore; + + /** */ + private boolean clientReq; + + /** Keep binary flag. */ + private boolean keepBinary; + + /** */ + @GridDirectTransient + private GridNearAtomicUpdateResponse res; + + /** Maximum possible size of inner collections. */ + @GridDirectTransient + private int initSize; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridNearAtomicSingleUpdateRequest() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheId Cache ID. + * @param nodeId Node ID. + * @param futVer Future version. + * @param fastMap Fast map scheme flag. + * @param updateVer Update version set if fast map is performed. + * @param topVer Topology version. + * @param topLocked Topology locked flag. + * @param syncMode Synchronization mode. + * @param op Cache update operation. + * @param retval Return value required flag. + * @param expiryPlc Expiry policy. + * @param invokeArgs Optional arguments for entry processor. + * @param filter Optional filter for atomic check. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param skipStore Skip write-through to a persistent storage. + * @param keepBinary Keep binary flag. + * @param clientReq Client node request flag. + * @param addDepInfo Deployment info flag. + * @param maxEntryCnt Maximum entries count. + */ + public GridNearAtomicSingleUpdateRequest( + int cacheId, + UUID nodeId, + GridCacheVersion futVer, + boolean fastMap, + @Nullable GridCacheVersion updateVer, + @NotNull AffinityTopologyVersion topVer, + boolean topLocked, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + boolean retval, + @Nullable ExpiryPolicy expiryPlc, + @Nullable Object[] invokeArgs, + @Nullable CacheEntryPredicate[] filter, + @Nullable UUID subjId, + int taskNameHash, + boolean skipStore, + boolean keepBinary, + boolean clientReq, + boolean addDepInfo, + int maxEntryCnt + ) { + assert futVer != null; + + this.cacheId = cacheId; + this.nodeId = nodeId; + this.futVer = futVer; + this.fastMap = fastMap; + this.updateVer = updateVer; + + this.topVer = topVer; + this.topLocked = topLocked; + this.syncMode = syncMode; + this.op = op; + this.retval = retval; + this.expiryPlc = expiryPlc; + this.invokeArgs = invokeArgs; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.skipStore = skipStore; + this.keepBinary = keepBinary; + this.clientReq = clientReq; + this.addDepInfo = addDepInfo; + + // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries + // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys + // participate in request. As such, we know upper bound of all collections in request. If this bound is lower + // than 10, we use it. + initSize = Math.min(maxEntryCnt, 10); + + keys = new ArrayList<>(initSize); + } + + /** {@inheritDoc} */ + @Override public UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override public UUID subjectId() { + return subjId; + } + + /** {@inheritDoc} */ + @Override public int taskNameHash() { + return taskNameHash; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion futureVersion() { + return futVer; + } + + /** {@inheritDoc} */ + @Override public boolean fastMap() { + return fastMap; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion updateVersion() { + return updateVer; + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public boolean topologyLocked() { + return topLocked; + } + + /** {@inheritDoc} */ + @Override public boolean clientRequest() { + return clientReq; + } + + /** {@inheritDoc} */ + @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { + return syncMode; + } + + /** {@inheritDoc} */ + @Override public ExpiryPolicy expiry() { + return expiryPlc; + } + + /** {@inheritDoc} */ + @Override public boolean returnValue() { + return retval; + } + + /** {@inheritDoc} */ + @Override @Nullable public CacheEntryPredicate[] filter() { + return filter; + } + + /** {@inheritDoc} */ + @Override public boolean hasFilter() { + return !F.isEmpty(filter); + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return skipStore; + } + + /** {@inheritDoc} */ + @Override public boolean keepBinary() { + return keepBinary; + } + + /** + * @param key Key to add. + * @param val Optional update value. + * @param primary If given key is primary on this mapping. + */ + @SuppressWarnings("unchecked") + public void addUpdateEntry(KeyCacheObject key, @Nullable Object val, boolean primary) { + EntryProcessor<Object, Object, Object> entryProcessor = null; + + if (op == TRANSFORM) { + assert val instanceof EntryProcessor : val; + + entryProcessor = (EntryProcessor<Object, Object, Object>) val; + } + + assert val != null || op == DELETE; + + keys.add(key); + + if (entryProcessor != null) { + if (entryProcessors == null) + entryProcessors = new ArrayList<>(initSize); + + entryProcessors.add(entryProcessor); + } + else if (val != null) { + assert val instanceof CacheObject : val; + + if (vals == null) + vals = new ArrayList<>(initSize); + + vals.add((CacheObject)val); + } + + hasPrimary |= primary; + } + + /** {@inheritDoc} */ + @Override public List<KeyCacheObject> keys() { + return keys; + } + + /** {@inheritDoc} */ + @Override public int keysCount() { + return keys.size(); + } + + /** {@inheritDoc} */ + @Override public List<?> values() { + return op == TRANSFORM ? entryProcessors : vals; + } + + /** {@inheritDoc} */ + @Override public GridCacheOperation operation() { + return op; + } + + /** {@inheritDoc} */ + @Override @Nullable public Object[] invokeArguments() { + return invokeArgs; + } + + /** {@inheritDoc} */ + @Override public CacheObject value(int idx) { + assert op == UPDATE : op; + + return vals.get(idx); + } + + /** {@inheritDoc} */ + @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { + assert op == TRANSFORM : op; + + return entryProcessors.get(idx); + } + + /** {@inheritDoc} */ + @Override public CacheObject writeValue(int idx) { + if (vals != null) + return vals.get(idx); + + return null; + } + + /** {@inheritDoc} */ + @Override @Nullable public List<GridCacheVersion> conflictVersions() { + return conflictVers; + } + + /** {@inheritDoc} */ + @Override @Nullable public GridCacheVersion conflictVersion(int idx) { + if (conflictVers != null) { + assert idx >= 0 && idx < conflictVers.size(); + + return conflictVers.get(idx); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public long conflictTtl(int idx) { + if (conflictTtls != null) { + assert idx >= 0 && idx < conflictTtls.size(); + + return conflictTtls.get(idx); + } + + return CU.TTL_NOT_CHANGED; + } + + /** {@inheritDoc} */ + @Override public long conflictExpireTime(int idx) { + if (conflictExpireTimes != null) { + assert idx >= 0 && idx < conflictExpireTimes.size(); + + return conflictExpireTimes.get(idx); + } + + return CU.EXPIRE_TIME_CALCULATE; + } + + /** {@inheritDoc} */ + @Override public boolean hasPrimary() { + return hasPrimary; + } + + /** {@inheritDoc} */ + @Override public boolean onResponse(GridNearAtomicUpdateResponse res) { + if (this.res == null) { + this.res = res; + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override @Nullable public GridNearAtomicUpdateResponse response() { + return res; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + prepareMarshalCacheObjects(keys, cctx); + + if (filter != null) { + boolean hasFilter = false; + + for (CacheEntryPredicate p : filter) { + if (p != null) { + hasFilter = true; + + p.prepareMarshal(cctx); + } + } + + if (!hasFilter) + filter = null; + } + + if (expiryPlc != null && expiryPlcBytes == null) + expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); + + if (op == TRANSFORM) { + // force addition of deployment info for entry processors if P2P is enabled globally. + if (!addDepInfo && ctx.deploymentEnabled()) + addDepInfo = true; + + if (entryProcessorsBytes == null) + entryProcessorsBytes = marshalCollection(entryProcessors, cctx); + + if (invokeArgsBytes == null) + invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); + } + else + prepareMarshalCacheObjects(vals, cctx); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + finishUnmarshalCacheObjects(keys, cctx, ldr); + + if (op == TRANSFORM) { + if (entryProcessors == null) + entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + + if (invokeArgs == null) + invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); + } + else + finishUnmarshalCacheObjects(vals, cctx, ldr); + + if (filter != null) { + for (CacheEntryPredicate p : filter) { + if (p != null) + p.finishUnmarshal(cctx, ldr); + } + } + + if (expiryPlcBytes != null && expiryPlc == null) + expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeBoolean("clientReq", clientReq)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeMessage("conflictTtls", conflictTtls)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeBoolean("fastMap", fastMap)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeMessage("futVer", futVer)) + return false; + + writer.incrementState(); + + case 12: + if (!writer.writeBoolean("hasPrimary", hasPrimary)) + return false; + + writer.incrementState(); + + case 13: + if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) + return false; + + writer.incrementState(); + + case 14: + if (!writer.writeBoolean("keepBinary", keepBinary)) + return false; + + writer.incrementState(); + + case 15: + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 16: + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 17: + if (!writer.writeBoolean("retval", retval)) + return false; + + writer.incrementState(); + + case 18: + if (!writer.writeBoolean("skipStore", skipStore)) + return false; + + writer.incrementState(); + + case 19: + if (!writer.writeUuid("subjId", subjId)) + return false; + + writer.incrementState(); + + case 20: + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 21: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 22: + if (!writer.writeBoolean("topLocked", topLocked)) + return false; + + writer.incrementState(); + + case 23: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 24: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 25: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + clientReq = reader.readBoolean("clientReq"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + conflictExpireTimes = reader.readMessage("conflictExpireTimes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + conflictTtls = reader.readMessage("conflictTtls"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + fastMap = reader.readBoolean("fastMap"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + futVer = reader.readMessage("futVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: + hasPrimary = reader.readBoolean("hasPrimary"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: + invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: + keepBinary = reader.readBoolean("keepBinary"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: + byte opOrd; + + opOrd = reader.readByte("op"); + + if (!reader.isLastRead()) + return false; + + op = GridCacheOperation.fromOrdinal(opOrd); + + reader.incrementState(); + + case 17: + retval = reader.readBoolean("retval"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: + skipStore = reader.readBoolean("skipStore"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 19: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: + byte syncModeOrd; + + syncModeOrd = reader.readByte("syncMode"); + + if (!reader.isLastRead()) + return false; + + syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); + + reader.incrementState(); + + case 21: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 22: + topLocked = reader.readBoolean("topLocked"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 23: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 24: + updateVer = reader.readMessage("updateVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 25: + vals = reader.readCollection("vals", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridNearAtomicUpdateRequest.class); + } + + /** {@inheritDoc} */ + @Override public void cleanup(boolean clearKeys) { + vals = null; + entryProcessors = null; + entryProcessorsBytes = null; + invokeArgs = null; + invokeArgsBytes = null; + + if (clearKeys) + keys = null; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -23; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 26; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearAtomicSingleUpdateRequest.class, this, "filter", Arrays.toString(filter), + "parent", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 375c02f..f061868 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -33,8 +33,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -42,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -58,13 +57,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPD /** * Lite DHT cache update request sent from near node to primary node. */ -public class GridNearAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable { +public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateRequest { /** */ private static final long serialVersionUID = 0L; - /** Message index. */ - public static final int CACHE_MSG_IDX = nextIndexId(); - /** Target node ID. */ @GridDirectTransient private UUID nodeId; @@ -249,119 +245,87 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } - - /** - * @return Mapped node ID. - */ - public UUID nodeId() { + @Override public UUID nodeId() { return nodeId; } - /** - * @param nodeId Node ID. - */ - public void nodeId(UUID nodeId) { + /** {@inheritDoc} */ + @Override public void nodeId(UUID nodeId) { this.nodeId = nodeId; } - /** - * @return Subject ID. - */ - public UUID subjectId() { + /** {@inheritDoc} */ + @Override public UUID subjectId() { return subjId; } - /** - * @return Task name hash. - */ - public int taskNameHash() { + /** {@inheritDoc} */ + @Override public int taskNameHash() { return taskNameHash; } - /** - * @return Future version. - */ - public GridCacheVersion futureVersion() { + /** {@inheritDoc} */ + @Override public GridCacheVersion futureVersion() { return futVer; } - /** - * @return Flag indicating whether this is fast-map udpate. - */ - public boolean fastMap() { + /** {@inheritDoc} */ + @Override public boolean fastMap() { return fastMap; } - /** - * @return Update version for fast-map request. - */ - public GridCacheVersion updateVersion() { + /** {@inheritDoc} */ + @Override public GridCacheVersion updateVersion() { return updateVer; } - /** - * @return Topology version. - */ + /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion() { return topVer; } - /** - * @return Topology locked flag. - */ - public boolean topologyLocked() { + /** {@inheritDoc} */ + @Override public boolean topologyLocked() { return topLocked; } - /** - * @return {@code True} if request sent from client node. - */ - public boolean clientRequest() { + /** {@inheritDoc} */ + @Override public boolean clientRequest() { return clientReq; } - /** - * @return Cache write synchronization mode. - */ - public CacheWriteSynchronizationMode writeSynchronizationMode() { + /** {@inheritDoc} */ + @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { return syncMode; } - /** - * @return Expiry policy. - */ - public ExpiryPolicy expiry() { + /** {@inheritDoc} */ + @Override public ExpiryPolicy expiry() { return expiryPlc; } - /** - * @return Return value flag. - */ - public boolean returnValue() { + /** {@inheritDoc} */ + @Override public boolean returnValue() { return retval; } - /** - * @return Filter. - */ - @Nullable public CacheEntryPredicate[] filter() { + /** {@inheritDoc} */ + @Override @Nullable public CacheEntryPredicate[] filter() { return filter; } - /** - * @return Skip write-through to a persistent storage. - */ - public boolean skipStore() { + /** {@inheritDoc} */ + @Override public boolean hasFilter() { + return !F.isEmpty(filter); + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { return skipStore; } - /** - * @return Keep binary flag. - */ - public boolean keepBinary() { + /** {@inheritDoc} */ + @Override public boolean keepBinary() { return keepBinary; } @@ -373,6 +337,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param conflictVer Conflict version (optional). * @param primary If given key is primary on this mapping. */ + @SuppressWarnings("unchecked") public void addUpdateEntry(KeyCacheObject key, @Nullable Object val, long conflictTtl, @@ -445,79 +410,60 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } } - /** - * @return Keys for this update request. - */ - public List<KeyCacheObject> keys() { + /** {@inheritDoc} */ + @Override public List<KeyCacheObject> keys() { return keys; } - /** - * @return Values for this update request. - */ - public List<?> values() { + /** {@inheritDoc} */ + @Override public int keysCount() { + return keys.size(); + } + + /** {@inheritDoc} */ + @Override public List<?> values() { return op == TRANSFORM ? entryProcessors : vals; } - /** - * @return Update operation. - */ - public GridCacheOperation operation() { + /** {@inheritDoc} */ + @Override public GridCacheOperation operation() { return op; } - /** - * @return Optional arguments for entry processor. - */ - @Nullable public Object[] invokeArguments() { + /** {@inheritDoc} */ + @Override @Nullable public Object[] invokeArguments() { return invokeArgs; } - /** - * @param idx Key index. - * @return Value. - */ - @SuppressWarnings("unchecked") - public CacheObject value(int idx) { + /** {@inheritDoc} */ + @Override public CacheObject value(int idx) { assert op == UPDATE : op; return vals.get(idx); } - /** - * @param idx Key index. - * @return Entry processor. - */ - @SuppressWarnings("unchecked") - public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { + /** {@inheritDoc} */ + @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { assert op == TRANSFORM : op; return entryProcessors.get(idx); } - /** - * @param idx Index to get. - * @return Write value - either value, or transform closure. - */ - public CacheObject writeValue(int idx) { + /** {@inheritDoc} */ + @Override public CacheObject writeValue(int idx) { if (vals != null) return vals.get(idx); return null; } - /** - * @return Conflict versions. - */ - @Nullable public List<GridCacheVersion> conflictVersions() { + /** {@inheritDoc} */ + @Override @Nullable public List<GridCacheVersion> conflictVersions() { return conflictVers; } - /** - * @param idx Index. - * @return Conflict version. - */ - @Nullable public GridCacheVersion conflictVersion(int idx) { + /** {@inheritDoc} */ + @Override @Nullable public GridCacheVersion conflictVersion(int idx) { if (conflictVers != null) { assert idx >= 0 && idx < conflictVers.size(); @@ -527,11 +473,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri return null; } - /** - * @param idx Index. - * @return Conflict TTL. - */ - public long conflictTtl(int idx) { + /** {@inheritDoc} */ + @Override public long conflictTtl(int idx) { if (conflictTtls != null) { assert idx >= 0 && idx < conflictTtls.size(); @@ -541,11 +484,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri return CU.TTL_NOT_CHANGED; } - /** - * @param idx Index. - * @return Conflict expire time. - */ - public long conflictExpireTime(int idx) { + /** {@inheritDoc} */ + @Override public long conflictExpireTime(int idx) { if (conflictExpireTimes != null) { assert idx >= 0 && idx < conflictExpireTimes.size(); @@ -555,18 +495,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri return CU.EXPIRE_TIME_CALCULATE; } - /** - * @return Flag indicating whether this request contains primary keys. - */ - public boolean hasPrimary() { + /** {@inheritDoc} */ + @Override public boolean hasPrimary() { return hasPrimary; } - /** - * @param res Response. - * @return {@code True} if current response was {@code null}. - */ - public boolean onResponse(GridNearAtomicUpdateResponse res) { + /** {@inheritDoc} */ + @Override public boolean onResponse(GridNearAtomicUpdateResponse res) { if (this.res == null) { this.res = res; @@ -576,10 +511,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri return false; } - /** - * @return Response. - */ - @Nullable public GridNearAtomicUpdateResponse response() { + /** {@inheritDoc} */ + @Override @Nullable public GridNearAtomicUpdateResponse response() { return res; } @@ -1025,12 +958,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri return reader.afterMessageRead(GridNearAtomicUpdateRequest.class); } - /** - * Cleanup values. - * - * @param clearKeys If {@code true} clears keys. - */ - public void cleanup(boolean clearKeys) { + /** {@inheritDoc} */ + @Override public void cleanup(boolean clearKeys) { vals = null; entryProcessors = null; entryProcessorsBytes = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/0c560110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index a7481d3..35e554d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -45,7 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; @@ -127,10 +127,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { * @param res Update response. */ public void processNearAtomicUpdateResponse( - GridNearAtomicUpdateRequest req, + GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res ) { - if (F.size(res.failedKeys()) == req.keys().size()) + if (F.size(res.failedKeys()) == req.keysCount()) return; /* @@ -153,7 +153,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - for (int i = 0; i < req.keys().size(); i++) { + for (int i = 0; i < req.keysCount(); i++) { if (F.contains(skipped, i)) continue;
