Single update POC for atomic cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d310860 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d310860 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d310860 Branch: refs/heads/ignite-1843 Commit: 1d31086038a0d8359f4bb1304a8ad8b6d9370a69 Parents: 77a3f64 Author: sboikov <[email protected]> Authored: Tue Nov 10 15:45:02 2015 +0300 Committer: sboikov <[email protected]> Committed: Tue Nov 10 15:45:02 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAtomicFuture.java | 11 + .../processors/cache/GridCacheMessage.java | 18 + .../dht/atomic/GridDhtAtomicCache.java | 460 +++++++- .../atomic/GridDhtAtomicSingleUpdateFuture.java | 401 +++++++ .../dht/atomic/GridDhtAtomicUpdateFuture.java | 13 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 177 ++- .../GridNearAtomicSingleUpdateFuture.java | 1093 ++++++++++++++++++ .../dht/atomic/GridNearAtomicUpdateFuture.java | 20 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 187 ++- .../distributed/near/GridNearAtomicCache.java | 78 +- 10 files changed, 2326 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index be35c5c..15f004a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -18,8 +18,11 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; +import java.util.UUID; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; /** * Update future for atomic cache. @@ -37,4 +40,12 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { * @return Future keys. */ public Collection<?> keys(); + + public void map(); + + public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse res); + + public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res); + + public void onResult(UUID nodeId); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index bdd2118..45a4b87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -511,6 +511,15 @@ public abstract class GridCacheMessage implements Message { } } + protected final void prepareMarshalCacheObject(@Nullable CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException { + if (obj != null) { + obj.prepareMarshal(ctx.cacheObjectContext()); + + if (addDepInfo) + prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx); + } + } + /** * @param col Collection. * @param ctx Cache context. @@ -556,6 +565,15 @@ public abstract class GridCacheMessage implements Message { } } + protected final void finishUnmarshalCacheObject(@Nullable CacheObject obj, + GridCacheContext ctx, + ClassLoader ldr) + throws IgniteCheckedException + { + if (obj != null) + obj.finishUnmarshal(ctx.cacheObjectContext(), ldr); + } + /** * @param col Collection. * @param ctx Context. http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 7f9edb2..da5cb6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException; +import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -374,10 +375,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); + A.notNull(val, "val"); - return updateAllAsync0(F0.asMap(key, val), - null, - null, + return snigleUpdateAllAsync0(key, + val, null, null, false, @@ -776,6 +777,57 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true); } + private IgniteInternalFuture snigleUpdateAllAsync0( + K key, + V val, + @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap, + @Nullable Object[] invokeArgs, + final boolean retval, + final boolean rawRetval, + @Nullable final CacheEntryPredicate[] filter, + final boolean waitTopFut + ) { + assert ctx.updatesAllowed(); + + if (map != null && keyCheck) + validateCacheKeys(map.keySet()); + + ctx.checkSecurity(SecurityPermission.CACHE_PUT); + + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + UUID subjId = ctx.subjectIdPerCall(null, opCtx); + + int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); + + final GridNearAtomicSingleUpdateFuture updateFut = new GridNearAtomicSingleUpdateFuture( + ctx, + this, + ctx.config().getWriteSynchronizationMode(), + invokeMap != null ? TRANSFORM : UPDATE, + key, + val, + invokeArgs, + retval, + rawRetval, + opCtx != null ? opCtx.expiry() : null, + filter, + subjId, + taskNameHash, + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + waitTopFut); + + return asyncOp(new CO<IgniteInternalFuture<Object>>() { + @Override public IgniteInternalFuture<Object> apply() { + updateFut.map(); + + return updateFut; + } + }); + } + + /** * Entry point for all public API put/transform methods. * @@ -1054,7 +1106,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridNearAtomicUpdateRequest req, final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb ) { - IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); + IgniteInternalFuture<Object> forceFut = preldr.request(req.singleUpdate() ? Collections.singleton(req.singleKey()) : req.keys(), req.topologyVersion()); if (forceFut.isDone()) updateAllAsyncInternal0(nodeId, req, completionCb); @@ -1082,11 +1134,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), ctx.deploymentEnabled()); - List<KeyCacheObject> keys = req.keys(); + boolean single = req.singleUpdate(); - assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1); + assert !req.returnValue() || (req.operation() == TRANSFORM || single || req.keys().size() == 1); - GridDhtAtomicUpdateFuture dhtFut = null; + GridCacheAtomicFuture dhtFut = null; boolean remap = false; @@ -1097,7 +1149,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { // If batch store update is enabled, we need to lock all entries. // First, need to acquire locks on cache entries, then check filter. - List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion()); + List<GridDhtCacheEntry> locked = null; + GridDhtCacheEntry singleLocked = null; + + if (req.singleUpdate()) + singleLocked = lockEntry(req.singleKey(), req.topologyVersion()); + else + locked = lockEntries(req.keys(), req.topologyVersion()); Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; @@ -1106,8 +1164,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { if (topology().stopping()) { - res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " + - "(cache is stopped): " + name())); + res.addFailedKeys(single ? Collections.singleton(req.singleKey()) : req.keys(), + new IgniteCheckedException("Failed to perform cache operation " + + "(cache is stopped): " + name())); completionCb.apply(req, res); @@ -1152,7 +1211,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; - if (keys.size() > 1 && // Several keys ... + if (single) { + UpdateSingleResult updRes = updateSingleEntry(node, + hasNear, + req, + res, + singleLocked, + ver, + (GridDhtAtomicSingleUpdateFuture)dhtFut, + completionCb, + ctx.isDrEnabled(), + taskName, + expiry); + + retVal = updRes.returnValue(); + deleted = updRes.deleted(); + dhtFut = updRes.dhtFuture(); + } + else if (req.keys().size() > 1 && // Several keys ... writeThrough() && !req.skipStore() && // and store is enabled ... !ctx.store().isLocal() && // and this is not local store ... !ctx.dr().receiveEnabled() // and no DR. @@ -1164,7 +1240,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res, locked, ver, - dhtFut, + (GridDhtAtomicUpdateFuture)dhtFut, completionCb, ctx.isDrEnabled(), taskName, @@ -1183,7 +1259,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res, locked, ver, - dhtFut, + (GridDhtAtomicUpdateFuture)dhtFut, completionCb, ctx.isDrEnabled(), taskName, @@ -1216,9 +1292,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { e.printStackTrace(); } finally { - if (locked != null) - unlockEntries(locked, req.topologyVersion()); + if (single) { + if (singleLocked != null) + unlockSingleEntry(singleLocked, req.topologyVersion()); + } + else { + if (locked != null) + unlockEntries(locked, req.topologyVersion()); + } + // TODO // Enqueue if necessary after locks release. if (deleted != null) { assert !deleted.isEmpty(); @@ -1242,7 +1325,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // an attempt to use cleaned resources. U.error(log, "Unexpected exception during cache update", e); - res.addFailedKeys(keys, e); + res.addFailedKeys(single ? Collections.singleton(req.singleKey()) : req.keys(), e); completionCb.apply(req, res); @@ -1252,7 +1335,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (remap) { assert dhtFut == null; - res.remapKeys(req.keys()); + res.remapKeys(single ? Collections.singletonList(req.singleKey()) : req.keys()); completionCb.apply(req, res); } @@ -1777,7 +1860,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName); if (dhtFut == null && !F.isEmpty(filteredReaders)) { - dhtFut = createDhtFuture(ver, req, res, completionCb, true); + dhtFut = (GridDhtAtomicUpdateFuture)createDhtFuture(ver, req, res, completionCb, true); readersOnly = true; } @@ -1889,6 +1972,187 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** + * Updates locked entries one-by-one. + * + * @param node Originating node. + * @param hasNear {@code True} if originating node has near cache. + * @param req Update request. + * @param res Update response. + * @param entry Locked entry. + * @param ver Assigned update version. + * @param dhtFut Optional DHT future. + * @param completionCb Completion callback to invoke when DHT future is completed. + * @param replicate Whether DR is enabled for that cache. + * @param taskName Task name. + * @param expiry Expiry policy. + * @return Return value. + * @throws GridCacheEntryRemovedException Should be never thrown. + */ + private UpdateSingleResult updateSingleEntry( + ClusterNode node, + boolean hasNear, + GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res, + GridDhtCacheEntry entry, + GridCacheVersion ver, + @Nullable GridDhtAtomicSingleUpdateFuture dhtFut, + CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + boolean replicate, + String taskName, + @Nullable IgniteCacheExpiryPolicy expiry + ) throws GridCacheEntryRemovedException { + GridCacheReturn retVal = null; + Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; + + KeyCacheObject k = req.singleKey(); + + AffinityTopologyVersion topVer = req.topologyVersion(); + + boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); + + boolean readersOnly = false; + + boolean intercept = ctx.config().getInterceptor() != null; + + GridCacheOperation op = req.operation(); + + // We are holding java-level locks on entries at this point. + // No GridCacheEntryRemovedException can be thrown. + try { + if (entry == null) + return new UpdateSingleResult(retVal, deleted, dhtFut);; + + boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(), + req.topologyVersion()); + + Object writeVal = op == TRANSFORM ? req.singleEntryProcessor() : req.singleWriteValue(); + + Collection<UUID> readers = null; + Collection<UUID> filteredReaders = null; + + if (checkReaders) { + readers = entry.readers(); + filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); + } + + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( + ver, + node.id(), + locNodeId, + op, + writeVal, + req.invokeArguments(), + primary && writeThrough() && !req.skipStore(), + !req.skipStore(), + req.returnValue(), + expiry, + true, + true, + primary, + ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. + topVer, + req.filter(), + replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, + CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE, + null, + true, + intercept, + req.subjectId(), + taskName); + + if (dhtFut == null && !F.isEmpty(filteredReaders)) { + dhtFut = (GridDhtAtomicSingleUpdateFuture)createDhtFuture(ver, req, res, completionCb, true); + + readersOnly = true; + } + + if (dhtFut != null) { + if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. + assert updRes.conflictResolveResult() == null : updRes; + + if (!readersOnly) { + dhtFut.addWriteEntry(entry, + updRes.newValue(), + updRes.newTtl()); + } + + if (!F.isEmpty(filteredReaders)) + dhtFut.addNearWriteEntries(filteredReaders, + entry, + updRes.newValue(), + updRes.newTtl()); + } + else { + if (log.isDebugEnabled()) + log.debug("Entry did not pass the filter or conflict resolution (will skip write) " + + "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); + } + } + + if (hasNear) { + if (primary && updRes.sendToDht()) { + if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { + // If put the same value as in request then do not need to send it back. + if (op == TRANSFORM || writeVal != updRes.newValue()) { + res.addNearValue(0, + updRes.newValue(), + updRes.newTtl(), + updRes.conflictExpireTime()); + } + else + res.addNearTtl(0, updRes.newTtl(), updRes.conflictExpireTime()); + + if (updRes.newValue() != null) { + IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); + + assert f == null : f; + } + } + else if (F.contains(readers, node.id())) // Reader became primary or backup. + entry.removeReader(node.id(), req.messageId()); + else + res.addSkippedIndex(0); + } + else + res.addSkippedIndex(0); + } + + if (updRes.removeVersion() != null) + deleted = Collections.singleton(F.t(entry, updRes.removeVersion())); + + if (op == TRANSFORM) { + assert !req.returnValue(); + + IgniteBiTuple<Object, Exception> compRes = updRes.computedResult(); + + if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) { + retVal = new GridCacheReturn(node.isLocal()); + + retVal.addEntryProcessResult(ctx, + k, + null, + compRes.get1(), + compRes.get2()); + } + } + else { + CacheObject ret = updRes.oldValue(); + + retVal = new GridCacheReturn(ctx, + node.isLocal(), + req.returnValue() ? ret : null, + updRes.success()); + } + } + catch (IgniteCheckedException e) { + res.addFailedKey(k, e); + } + + return new UpdateSingleResult(retVal, deleted, dhtFut); + } + + /** * @param hasNear {@code True} if originating node has near cache. * @param firstEntryIdx Index of the first entry in the request keys collection. * @param entries Entries to update. @@ -2067,7 +2331,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { batchRes.addDeleted(entry, updRes, entries); if (dhtFut == null && !F.isEmpty(filteredReaders)) { - dhtFut = createDhtFuture(ver, req, res, completionCb, true); + dhtFut = (GridDhtAtomicUpdateFuture)createDhtFuture(ver, req, res, completionCb, true); batchRes.readersOnly(true); } @@ -2139,6 +2403,29 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return dhtFut; } + private GridDhtCacheEntry lockEntry(KeyCacheObject key, AffinityTopologyVersion topVer) + throws GridDhtInvalidPartitionException { + while (true) { + try { + GridDhtCacheEntry entry = entryExx(key, topVer); + + UNSAFE.monitorEnter(entry); + + if (entry.obsolete()) + UNSAFE.monitorExit(entry); + else + return entry; + } + catch (GridDhtInvalidPartitionException e) { + // Ignore invalid partition exception in CLOCK ordering mode. + if (ctx.config().getAtomicWriteOrderMode() == CLOCK) + return null; + else + throw e; + } + } + } + /** * Acquires java-level locks on cache entries. Returns collection of locked entries. * @@ -2226,6 +2513,30 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } + private void unlockSingleEntry(GridDhtCacheEntry entry, AffinityTopologyVersion topVer) { + // Process deleted entries before locks release. + assert ctx.deferredDelete(this) : this; + + boolean skip = false; + + try { + if (entry.deleted()) + skip = true; + } + finally { + UNSAFE.monitorExit(entry); + } + + entry.onUnlock(); + + if (skip) + return; + + // Must touch all entries since update may have deleted entries. + // Eviction manager will remove empty entries. + ctx.evicts().touch(entry, topVer); + } + /** * Releases java-level locks on cache entries. * @@ -2376,7 +2687,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param force If {@code true} then creates future without optimizations checks. * @return Backup update future or {@code null} if there are no backups. */ - @Nullable private GridDhtAtomicUpdateFuture createDhtFuture( + @Nullable private GridCacheAtomicFuture createDhtFuture( GridCacheVersion writeVer, GridNearAtomicUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes, @@ -2404,7 +2715,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); + return updateReq.singleUpdate() ? + new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes) : + new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); } /** @@ -2452,7 +2765,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.nodeId(ctx.localNodeId()); - GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); + GridCacheAtomicFuture fut = (GridCacheAtomicFuture)ctx.mvcc().atomicFuture(res.futureVersion()); if (fut != null) fut.onResult(nodeId, res); @@ -2481,8 +2794,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); + if (req.singleUpdate()) { + KeyCacheObject key = req.singleKey(); try { while (true) { @@ -2491,22 +2804,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { entry = entryExx(key); - CacheObject val = req.value(i); - EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); + CacheObject val = req.singleValue(); - GridCacheOperation op = entryProcessor != null ? TRANSFORM : - (val != null) ? UPDATE : DELETE; + GridCacheOperation op = (val != null) ? UPDATE : DELETE; - long ttl = req.ttl(i); - long expireTime = req.conflictExpireTime(i); + long ttl = req.ttl(0); + long expireTime = req.conflictExpireTime(0); GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, nodeId, op, - op == TRANSFORM ? entryProcessor : val, - op == TRANSFORM ? req.invokeArguments() : null, + val, + null, /*write-through*/false, /*read-through*/false, /*retval*/false, @@ -2520,7 +2831,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { replicate ? DR_BACKUP : DR_NONE, ttl, expireTime, - req.conflictVersion(i), + null, false, intercept, req.subjectId(), @@ -2552,6 +2863,79 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e)); } } + else { + for (int i = 0; i < req.size(); i++) { + KeyCacheObject key = req.key(i); + + try { + while (true) { + GridDhtCacheEntry entry = null; + + try { + entry = entryExx(key); + + CacheObject val = req.value(i); + EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); + + GridCacheOperation op = entryProcessor != null ? TRANSFORM : + (val != null) ? UPDATE : DELETE; + + long ttl = req.ttl(i); + long expireTime = req.conflictExpireTime(i); + + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( + ver, + nodeId, + nodeId, + op, + op == TRANSFORM ? entryProcessor : val, + op == TRANSFORM ? req.invokeArguments() : null, + /*write-through*/false, + /*read-through*/false, + /*retval*/false, + /*expiry policy*/null, + /*event*/true, + /*metrics*/true, + /*primary*/false, + /*check version*/!req.forceTransformBackups(), + req.topologyVersion(), + CU.empty0(), + replicate ? DR_BACKUP : DR_NONE, + ttl, + expireTime, + req.conflictVersion(i), + false, + intercept, + req.subjectId(), + taskName); + + if (updRes.removeVersion() != null) + ctx.onDeferredDelete(entry, updRes.removeVersion()); + + entry.onUnlock(); + + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry while updating backup value (will retry): " + key); + + entry = null; + } + finally { + if (entry != null) + ctx.evicts().touch(entry, req.topologyVersion()); + } + } + } + catch (GridDhtInvalidPartitionException ignored) { + // Ignore. + } + catch (IgniteCheckedException e) { + res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e)); + } + } + } if (isNearEnabled(cacheCfg)) ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res); @@ -2612,7 +2996,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); - GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc(). + GridCacheAtomicFuture updateFut = (GridCacheAtomicFuture)ctx.mvcc(). atomicFuture(res.futureVersion()); if (updateFut != null) @@ -2632,7 +3016,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); for (GridCacheVersion ver : res.futureVersions()) { - GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver); + GridCacheAtomicFuture updateFut = (GridCacheAtomicFuture)ctx.mvcc().atomicFuture(ver); if (updateFut != null) updateFut.onResult(nodeId); @@ -2676,7 +3060,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private final Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; /** */ - private final GridDhtAtomicUpdateFuture dhtFut; + private final GridCacheAtomicFuture dhtFut; /** * @param retVal Return value. @@ -2685,7 +3069,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private UpdateSingleResult(GridCacheReturn retVal, Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted, - GridDhtAtomicUpdateFuture dhtFut) { + GridCacheAtomicFuture dhtFut) { this.retVal = retVal; this.deleted = deleted; this.dhtFut = dhtFut; @@ -2708,7 +3092,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @return DHT future. */ - public GridDhtAtomicUpdateFuture dhtFuture() { + public GridCacheAtomicFuture dhtFuture() { return dhtFut; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java new file mode 100644 index 0000000..f31dda2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * DHT atomic cache backup update future. + */ +public class GridDhtAtomicSingleUpdateFuture extends GridFutureAdapter<Void> + implements GridCacheAtomicFuture<Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Cache context. */ + private GridCacheContext cctx; + + /** Future version. */ + private GridCacheVersion futVer; + + /** Write version. */ + private GridCacheVersion writeVer; + + /** Completion callback. */ + @GridToStringExclude + private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb; + + /** Mappings. */ + @GridToStringInclude + private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); + + /** Entries with readers. */ + private GridDhtCacheEntry nearReaderEntry; + + /** Update request. */ + private GridNearAtomicUpdateRequest updateReq; + + /** Update response. */ + private GridNearAtomicUpdateResponse updateRes; + + /** */ + private boolean waitForExchange; + + /** + * @param cctx Cache context. + * @param completionCb Callback to invoke when future is completed. + * @param writeVer Write version. + * @param updateReq Update request. + * @param updateRes Update response. + */ + public GridDhtAtomicSingleUpdateFuture( + GridCacheContext cctx, + CI2<GridNearAtomicUpdateRequest, + GridNearAtomicUpdateResponse> completionCb, + GridCacheVersion writeVer, + GridNearAtomicUpdateRequest updateReq, + GridNearAtomicUpdateResponse updateRes + ) { + assert updateReq.singleUpdate() : updateReq; + + this.cctx = cctx; + this.writeVer = writeVer; + + futVer = cctx.versions().next(updateReq.topologyVersion()); + this.updateReq = updateReq; + this.completionCb = completionCb; + this.updateRes = updateRes; + + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicSingleUpdateFuture.class); + + boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()); + + waitForExchange = !topLocked; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + return futVer.asGridUuid(); + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return futVer; + } + + /** {@inheritDoc} */ + @Override public Collection<? extends ClusterNode> nodes() { + return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull()); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + if (log.isDebugEnabled()) + log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); + + GridDhtAtomicUpdateRequest req = mappings.get(nodeId); + + if (req != null) { + // Remove only after added keys to failed set. + mappings.remove(nodeId); + + checkComplete(); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return true; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0) + return this; + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<KeyCacheObject> keys() { + return Collections.singleton(updateReq.singleKey()); + } + + /** + * @param entry Entry to map. + * @param val Value to write. + * @param ttl TTL (optional). + */ + public void addWriteEntry(GridDhtCacheEntry entry, + @Nullable CacheObject val, + long ttl) { + AffinityTopologyVersion topVer = updateReq.topologyVersion(); + + Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); + + if (log.isDebugEnabled()) + log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']'); + + CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); + + for (ClusterNode node : dhtNodes) { + UUID nodeId = node.id(); + + if (!nodeId.equals(cctx.localNodeId())) { + GridDhtAtomicUpdateRequest updateReq = new GridDhtAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + writeVer, + syncMode, + topVer, + false, + this.updateReq.subjectId(), + this.updateReq.taskNameHash(), + null, + cctx.deploymentEnabled(), + true); + + mappings.put(nodeId, updateReq); + + updateReq.addSingleWriteValue(entry.key(), + val, + ttl); + } + } + } + + /** + * @param readers Entry readers. + * @param entry Entry. + * @param val Value. + * @param ttl TTL for near cache update (optional). + */ + public void addNearWriteEntries(Iterable<UUID> readers, + GridDhtCacheEntry entry, + @Nullable CacheObject val, + long ttl) { + CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); + + AffinityTopologyVersion topVer = updateReq.topologyVersion(); + + for (UUID nodeId : readers) { + GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); + + + ClusterNode node = cctx.discovery().node(nodeId); + + // Node left the grid. + if (node == null) + continue; + + if (updateReq == null) { + updateReq = new GridDhtAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + writeVer, + syncMode, + topVer, + false, + this.updateReq.subjectId(), + this.updateReq.taskNameHash(), + null, + cctx.deploymentEnabled(), + false); + + mappings.put(nodeId, updateReq); + } + + nearReaderEntry = entry; + + updateReq.addNearWriteValue(entry.key(), + val, + null, + ttl, + -1L); + } + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + cctx.mvcc().removeAtomicFuture(version()); + + if (err != null) + updateRes.addFailedKey(updateReq.singleKey(), err); + + if (updateReq.writeSynchronizationMode() == FULL_SYNC) + completionCb.apply(updateReq, updateRes); + + return true; + } + + return false; + } + + /** + * Sends requests to remote nodes. + */ + public void map() { + if (!mappings.isEmpty()) { + for (GridDhtAtomicUpdateRequest req : mappings.values()) { + try { + if (log.isDebugEnabled()) + log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ignored) { + U.warn(log, "Failed to send update request to backup node because it left grid: " + + req.nodeId()); + + mappings.remove(req.nodeId()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send update request to backup node (did node leave the grid?): " + + req.nodeId(), e); + + mappings.remove(req.nodeId()); + } + } + } + + checkComplete(); + + // Send response right away if no ACKs from backup is required. + // Backups will send ACKs anyway, future will be completed after all backups have replied. + if (updateReq.writeSynchronizationMode() != FULL_SYNC) + completionCb.apply(updateReq, updateRes); + } + + /** + * Callback for backup update response. + * + * @param nodeId Backup node ID. + * @param updateRes Update response. + */ + public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) { + if (log.isDebugEnabled()) + log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']'); + + if (updateRes.error() != null) + this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error()); + + if (!F.isEmpty(updateRes.nearEvicted())) { + assert nearReaderEntry != null; + + try { + nearReaderEntry.removeReader(nodeId, updateRes.messageId()); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Entry with evicted reader was removed [entry=" + nearReaderEntry + ", err=" + e + ']'); + } + } + + mappings.remove(nodeId); + + checkComplete(); + } + + @Override + public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { + assert false; + + throw new UnsupportedOperationException(); + } + + /** + * Deferred update response. + * + * @param nodeId Backup node ID. + */ + public void onResult(UUID nodeId) { + if (log.isDebugEnabled()) + log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']'); + + mappings.remove(nodeId); + + checkComplete(); + } + + /** + * Checks if all required responses are received. + */ + private void checkComplete() { + // Always wait for replies from all backups. + if (mappings.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Completing DHT atomic update future: " + this); + + onDone(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtAtomicSingleUpdateFuture.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4ace5c4..f0635e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -234,7 +234,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> this.updateReq.subjectId(), this.updateReq.taskNameHash(), forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled()); + cctx.deploymentEnabled(), + false); mappings.put(nodeId, updateReq); } @@ -290,7 +291,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> this.updateReq.subjectId(), this.updateReq.taskNameHash(), forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled()); + cctx.deploymentEnabled(), + false); mappings.put(nodeId, updateReq); } @@ -394,6 +396,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> checkComplete(); } + @Override + public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { + assert false; + + throw new UnsupportedOperationException(); + } + /** * Deferred update response. * http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index e55cac9..242c373 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -78,6 +78,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectCollection(CacheObject.class) private List<CacheObject> vals; + private KeyCacheObject singleKey; + + private CacheObject singleVal; + /** Conflict versions. */ @GridDirectCollection(GridCacheVersion.class) private List<GridCacheVersion> conflictVers; @@ -172,7 +176,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid UUID subjId, int taskNameHash, Object[] invokeArgs, - boolean addDepInfo + boolean addDepInfo, + boolean single ) { assert invokeArgs == null || forceTransformBackups; @@ -188,16 +193,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid this.invokeArgs = invokeArgs; this.addDepInfo = addDepInfo; - keys = new ArrayList<>(); + if (!single) { + keys = new ArrayList<>(); - if (forceTransformBackups) { - entryProcessors = new ArrayList<>(); - entryProcessorsBytes = new ArrayList<>(); + if (forceTransformBackups) { + entryProcessors = new ArrayList<>(); + entryProcessorsBytes = new ArrayList<>(); + } + else + vals = new ArrayList<>(); } - else - vals = new ArrayList<>(); } + + /** * @return Force transform backups flag. */ @@ -205,6 +214,36 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return forceTransformBackups; } + public KeyCacheObject singleKey() { + assert singleKey != null; + + return singleKey; + } + + public CacheObject singleValue() { + return singleVal; + } + + public boolean singleUpdate() { + return singleKey != null; + } + + public void addSingleWriteValue(KeyCacheObject key, + @Nullable CacheObject val, + long ttl) { + assert !forceTransformBackups; + + singleKey = key; + singleVal = val; + + if (ttl >= 0) { + if (ttls == null) + ttls = new GridLongList(1); + + ttls.add(ttl); + } + } + /** * @param key Key to add. * @param val Value, {@code null} if should be removed. @@ -276,10 +315,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param expireTime Expire time. */ public void addNearWriteValue(KeyCacheObject key, - @Nullable CacheObject val, - EntryProcessor<Object, Object, Object> entryProcessor, - long ttl, - long expireTime) + @Nullable CacheObject val, + EntryProcessor<Object, Object, Object> entryProcessor, + long ttl, + long expireTime) { if (nearKeys == null) { nearKeys = new ArrayList<>(); @@ -540,24 +579,37 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid GridCacheContext cctx = ctx.cacheContext(cacheId); - prepareMarshalCacheObjects(keys, cctx); + if (singleKey != null) { + assert !forceTransformBackups; - prepareMarshalCacheObjects(vals, cctx); + prepareMarshalCacheObject(singleKey, cctx); - prepareMarshalCacheObjects(nearKeys, cctx); + prepareMarshalCacheObject(singleVal, cctx); - prepareMarshalCacheObjects(nearVals, cctx); + prepareMarshalCacheObjects(nearKeys, cctx); - if (forceTransformBackups) { - // force addition of deployment info for entry processors if P2P is enabled globally. - if (!addDepInfo && ctx.deploymentEnabled()) - addDepInfo = true; + prepareMarshalCacheObjects(nearVals, cctx); + } + else { + prepareMarshalCacheObjects(keys, cctx); + + prepareMarshalCacheObjects(vals, cctx); + + prepareMarshalCacheObjects(nearKeys, cctx); + + prepareMarshalCacheObjects(nearVals, cctx); - invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); + if (forceTransformBackups) { + // force addition of deployment info for entry processors if P2P is enabled globally. + if (!addDepInfo && ctx.deploymentEnabled()) + addDepInfo = true; + + invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); - entryProcessorsBytes = marshalCollection(entryProcessors, cctx); + entryProcessorsBytes = marshalCollection(entryProcessors, cctx); - nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx); + nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx); + } } } @@ -567,22 +619,29 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid GridCacheContext cctx = ctx.cacheContext(cacheId); - finishUnmarshalCacheObjects(keys, cctx, ldr); + if (singleKey != null) { + finishUnmarshalCacheObject(singleKey, cctx, ldr); - finishUnmarshalCacheObjects(vals, cctx, ldr); + finishUnmarshalCacheObject(singleVal, cctx, ldr); + } + else { + finishUnmarshalCacheObjects(keys, cctx, ldr); - finishUnmarshalCacheObjects(nearKeys, cctx, ldr); + finishUnmarshalCacheObjects(vals, cctx, ldr); - finishUnmarshalCacheObjects(nearVals, cctx, ldr); + finishUnmarshalCacheObjects(nearKeys, cctx, ldr); - if (forceTransformBackups) { - entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + finishUnmarshalCacheObjects(nearVals, cctx, ldr); - invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); - } + if (forceTransformBackups) { + entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + + invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); + } - if (forceTransformBackups) - nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr); + if (forceTransformBackups) + nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr); + } } /** {@inheritDoc} */ @@ -684,42 +743,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid writer.incrementState(); case 16: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeMessage("singleKey", singleKey)) return false; writer.incrementState(); case 17: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeMessage("singleVal", singleVal)) return false; writer.incrementState(); case 18: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("ttls", ttls)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 21: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 22: + if (!writer.writeMessage("ttls", ttls)) + return false; + + writer.incrementState(); + + case 23: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 24: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -846,7 +917,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 16: - subjId = reader.readUuid("subjId"); + singleKey = reader.readMessage("singleKey"); if (!reader.isLastRead()) return false; @@ -854,6 +925,22 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 17: + singleVal = reader.readMessage("singleVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 19: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -865,7 +952,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 18: + case 20: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -873,7 +960,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 19: + case 21: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -881,7 +968,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 20: + case 22: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -889,7 +976,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 21: + case 23: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -897,7 +984,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 22: + case 24: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -917,7 +1004,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 25; } /** {@inheritDoc} */
