ignite-5272 For atomic update do not use GridDiscoveryManager.hasNearCache, instead always check readers on primary and update expected mapping on near node.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/851b9ad5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/851b9ad5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/851b9ad5 Branch: refs/heads/ignite-5272 Commit: 851b9ad584097bb9aa0ebc984100aec6b95d0965 Parents: 5d98cce Author: sboikov <[email protected]> Authored: Sat Jun 10 12:44:15 2017 +0300 Committer: sboikov <[email protected]> Committed: Sat Jun 10 12:44:15 2017 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtCacheEntry.java | 30 ++- .../GridDhtAtomicAbstractUpdateFuture.java | 76 +++++-- .../GridDhtAtomicAbstractUpdateRequest.java | 22 +- .../dht/atomic/GridDhtAtomicCache.java | 45 ++--- .../atomic/GridDhtAtomicSingleUpdateFuture.java | 6 +- .../GridDhtAtomicSingleUpdateRequest.java | 14 ++ .../dht/atomic/GridDhtAtomicUpdateFuture.java | 4 +- ...idNearAtomicAbstractSingleUpdateRequest.java | 19 +- .../GridNearAtomicAbstractUpdateFuture.java | 200 ++++++++++++++----- .../GridNearAtomicAbstractUpdateRequest.java | 86 ++++++-- .../atomic/GridNearAtomicFullUpdateRequest.java | 24 +-- ...GridNearAtomicSingleUpdateFilterRequest.java | 23 +-- .../GridNearAtomicSingleUpdateFuture.java | 42 ++-- ...GridNearAtomicSingleUpdateInvokeRequest.java | 23 +-- .../GridNearAtomicSingleUpdateRequest.java | 20 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 46 +++-- .../atomic/GridNearAtomicUpdateResponse.java | 24 +-- .../atomic/IgniteCacheAtomicProtocolTest.java | 128 ++++++++++++ 18 files changed, 548 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 2e86fb0..05831b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -625,6 +625,15 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** + * @return Readers. + */ + @Nullable public ReaderId[] readersLocked() { + assert Thread.holdsLock(this); + + return this.rdrs; + } + + /** * @return Collection of readers after check. * @throws GridCacheEntryRemovedException If removed. */ @@ -739,7 +748,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { /** * Reader ID. */ - private static class ReaderId { + public static class ReaderId { /** */ private static final ReaderId[] EMPTY_ARRAY = new ReaderId[0]; @@ -765,9 +774,26 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** + * @param readers Readers array. + * @param nodeId Node ID to check. + * @return {@code True} if node ID found in readers array. + */ + public static boolean contains(@Nullable ReaderId[] readers, UUID nodeId) { + if (readers == null) + return false; + + for (int i = 0; i < readers.length; i++) { + if (nodeId.equals(readers[i].nodeId)) + return true; + } + + return false; + } + + /** * @return Node ID. */ - UUID nodeId() { + public UUID nodeId() { return nodeId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 5c7c027..fb58e71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -92,6 +92,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA /** Response count. */ private volatile int resCnt; + /** */ + private boolean addedReader; + /** * @param cctx Cache context. * @param writeVer Write version. @@ -175,7 +178,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA List<ClusterNode> affNodes = affAssignment.get(entry.partition()); - List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), affAssignment, affNodes); + // Client has seen that rebalancing finished, it is safe to use affinity mapping. + List<ClusterNode> dhtNodes = updateReq.affinityMapping() ? + affNodes : cctx.dht().topology().nodes(entry.partition(), affAssignment, affNodes); if (dhtNodes == null) dhtNodes = affNodes; @@ -232,9 +237,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA * @param key Key. * @param readers Near cache readers. */ - protected abstract void addNearKey(KeyCacheObject key, Collection<UUID> readers); + protected abstract void addNearKey(KeyCacheObject key, GridDhtCacheEntry.ReaderId[] readers); /** + * @param nearNode Near node. * @param readers Entry readers. * @param entry Entry. * @param val Value. @@ -243,27 +249,43 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA * @param expireTime Expire time for near cache update (optional). */ final void addNearWriteEntries( - Collection<UUID> readers, + ClusterNode nearNode, + GridDhtCacheEntry.ReaderId[] readers, GridDhtCacheEntry entry, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long expireTime) { + assert readers != null; + CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); addNearKey(entry.key(), readers); AffinityTopologyVersion topVer = updateReq.topologyVersion(); - for (UUID nodeId : readers) { - GridDhtAtomicAbstractUpdateRequest updateReq = mappings.get(nodeId); + for (int i = 0; i < readers.length; i++) { + GridDhtCacheEntry.ReaderId reader = readers[i]; + + if (nearNode.id().equals(reader.nodeId())) + continue; + + GridDhtAtomicAbstractUpdateRequest updateReq = mappings.get(reader.nodeId()); if (updateReq == null) { - ClusterNode node = cctx.discovery().node(nodeId); + ClusterNode node = cctx.discovery().node(reader.nodeId()); // Node left the grid. - if (node == null) + if (node == null) { + try { + entry.removeReader(reader.nodeId(), -1L); + } + catch (GridCacheEntryRemovedException ignore) { + assert false; // Assume hold entry lock. + } + continue; + } updateReq = createRequest( node.id(), @@ -275,7 +297,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA expireTime, null); - mappings.put(nodeId, updateReq); + mappings.put(node.id(), updateReq); + + addedReader = true; } updateReq.addNearWriteValue(entry.key(), @@ -361,7 +385,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA GridNearAtomicUpdateResponse updateRes, GridDhtAtomicCache.UpdateReplyClosure completionCb) { if (F.isEmpty(mappings)) { - updateRes.dhtNodes(Collections.<UUID>emptyList()); + updateRes.mapping(Collections.<UUID>emptyList()); completionCb.apply(updateReq, updateRes); @@ -372,18 +396,31 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC || !ret.emptyResult() || - updateRes.nearVersion() != null || + updateReq.nearCache() || cctx.localNodeId().equals(nearNode.id()); boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht()); - if (needMapping) { + boolean readersOnlyNodes = false; + + if (!updateReq.needPrimaryResponse() && addedReader) { + for (GridDhtAtomicAbstractUpdateRequest dhtReq : mappings.values()) { + if (dhtReq.nearSize() > 0 && dhtReq.size() == 0) { + readersOnlyNodes = true; + + break; + } + } + } + + if (needMapping || readersOnlyNodes) { initMapping(updateRes); needReplyToNear = true; } - sendDhtRequests(nearNode, ret); + // If there are readers updates then nearNode should not finish before primary response received. + sendDhtRequests(nearNode, ret, !readersOnlyNodes); if (needReplyToNear) completionCb.apply(updateReq, updateRes); @@ -393,24 +430,25 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA * @param updateRes Response. */ private void initMapping(GridNearAtomicUpdateResponse updateRes) { - List<UUID> dhtNodes; + List<UUID> mapping; if (!F.isEmpty(mappings)) { - dhtNodes = new ArrayList<>(mappings.size()); + mapping = new ArrayList<>(mappings.size()); - dhtNodes.addAll(mappings.keySet()); + mapping.addAll(mappings.keySet()); } else - dhtNodes = Collections.emptyList(); + mapping = Collections.emptyList(); - updateRes.dhtNodes(dhtNodes); + updateRes.mapping(mapping); } /** * @param nearNode Near node. + * @param sndRes {@code True} if allow to send result from DHT nodes. * @param ret Return value. */ - private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret) { + private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret, boolean sndRes) { for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) { try { assert !cctx.localNodeId().equals(req.nodeId()) : req; @@ -418,7 +456,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA if (updateReq.fullSync()) { req.nearReplyInfo(nearNode.id(), updateReq.futureId()); - if (ret.emptyResult()) + if (sndRes && ret.emptyResult()) req.hasResult(true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index d2dc817..a50b68c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -42,13 +42,13 @@ import org.jetbrains.annotations.Nullable; */ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable { /** Skip store flag bit mask. */ - private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01; + protected static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01; /** Keep binary flag. */ - private static final int DHT_ATOMIC_KEEP_BINARY_FLAG_MASK = 0x02; + protected static final int DHT_ATOMIC_KEEP_BINARY_FLAG_MASK = 0x02; /** Near cache key flag. */ - private static final int DHT_ATOMIC_NEAR_FLAG_MASK = 0x04; + protected static final int DHT_ATOMIC_NEAR_FLAG_MASK = 0x04; /** */ static final int DHT_ATOMIC_HAS_RESULT_MASK = 0x08; @@ -451,20 +451,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMess @Nullable public abstract Object[] invokeArguments(); /** - * @return {@code True} if near cache update request. - */ - protected final boolean near() { - return isFlag(DHT_ATOMIC_NEAR_FLAG_MASK); - } - - /** - * @param near Near cache update flag. - */ - protected final void near(boolean near) { - setFlag(near, DHT_ATOMIC_NEAR_FLAG_MASK); - } - - /** * Sets flag mask. * * @param flag Set or clear. @@ -663,7 +649,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMess appendFlag(flags, "skipStore"); if (keepBinary()) appendFlag(flags, "keepBinary"); - if (near()) + if (isFlag(DHT_ATOMIC_NEAR_FLAG_MASK)) appendFlag(flags, "near"); if (hasResult()) appendFlag(flags, "hasRes"); http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 443b1b1..67e3ebc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1796,7 +1796,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { locked = lockEntries(req, req.topologyVersion()); - boolean hasNear = ctx.discovery().cacheNearNode(node, name()); + boolean hasNear = req.nearCache(); // Assign next version for update inside entries lock. GridCacheVersion ver = ctx.versions().next(top.topologyVersion()); @@ -2406,8 +2406,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { AffinityTopologyVersion topVer = req.topologyVersion(); - boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer); - boolean intercept = ctx.config().getInterceptor() != null; AffinityAssignment affAssignment = ctx.affinity().assignment(topVer); @@ -2431,13 +2429,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i); - Collection<UUID> readers = null; - Collection<UUID> filteredReaders = null; - - if (checkReaders) { - readers = entry.readers(); - filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id())); - } + // Get readers before innerUpdate (reader cleared after remove). + GridDhtCacheEntry.ReaderId[] readers = entry.readersLocked(); GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, @@ -2492,9 +2485,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.oldValue(), updRes.updateCounter()); - if (!F.isEmpty(filteredReaders)) + if (readers != null) dhtFut.addNearWriteEntries( - filteredReaders, + nearNode, + readers, entry, updRes.newValue(), entryProcessor, @@ -2522,13 +2516,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime()); if (updRes.newValue() != null) { - IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer); + IgniteInternalFuture<Boolean> f = + entry.addReader(nearNode.id(), req.messageId(), topVer); assert f == null : f; } } - else if (F.contains(readers, nearNode.id())) // Reader became primary or backup. + else if (GridDhtCacheEntry.ReaderId.contains(readers, nearNode.id())) { + // Reader became primary or backup. entry.removeReader(nearNode.id(), req.messageId()); + } else res.addSkippedIndex(i); } @@ -2627,8 +2624,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { AffinityTopologyVersion topVer = req.topologyVersion(); - boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer); - CacheStorePartialUpdateException storeErr = null; try { @@ -2688,13 +2683,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert writeVal != null || op == DELETE : "null write value found."; - Collection<UUID> readers = null; - Collection<UUID> filteredReaders = null; - - if (checkReaders) { - readers = entry.readers(); - filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id())); - } + // Get readers before innerUpdate (reader cleared after remove). + GridDhtCacheEntry.ReaderId[] readers = entry.readersLocked(); GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, @@ -2764,9 +2754,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.oldValue(), updRes.updateCounter()); - if (!F.isEmpty(filteredReaders)) + if (readers != null) dhtFut.addNearWriteEntries( - filteredReaders, + nearNode, + readers, entry, writeVal, entryProcessor, @@ -2793,8 +2784,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert f == null : f; } } - else if (readers.contains(nearNode.id())) // Reader became primary or backup. + else if (GridDhtCacheEntry.ReaderId.contains(readers, nearNode.id())) { + // Reader became primary or backup. entry.removeReader(nearNode.id(), req.messageId()); + } else res.addSkippedIndex(firstEntryIdx + i); } http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index f053d21..595e455 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.util.Collection; import java.util.List; import java.util.UUID; import org.apache.ignite.cache.CacheWriteSynchronizationMode; @@ -25,6 +24,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -67,9 +67,9 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture } /** {@inheritDoc} */ - @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> readers) { + @Override protected void addNearKey(KeyCacheObject key, GridDhtCacheEntry.ReaderId[] readers) { if (mappings == null) - mappings = U.newHashMap(readers.size()); + mappings = U.newHashMap(readers.length); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java index 8931c24..0ade243 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -147,6 +147,20 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat } /** + * @return {@code True} if near cache update request. + */ + private boolean near() { + return isFlag(DHT_ATOMIC_NEAR_FLAG_MASK); + } + + /** + * @param near Near cache update flag. + */ + private void near(boolean near) { + setFlag(near, DHT_ATOMIC_NEAR_FLAG_MASK); + } + + /** * @param key Key to add. * @param val Value, {@code null} if should be removed. * @param entryProcessor Entry processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 2a84445..2124330 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.util.Collection; import java.util.List; import java.util.UUID; import org.apache.ignite.cache.CacheWriteSynchronizationMode; @@ -25,6 +24,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -66,7 +66,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { } /** {@inheritDoc} */ - @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> readers) { + @Override protected void addNearKey(KeyCacheObject key, GridDhtCacheEntry.ReaderId[] readers) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java index 9a6909a..7022561 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java @@ -51,14 +51,11 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear * @param nodeId Node ID. * @param futId Future ID. * @param topVer Topology version. - * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. * @param op Cache update operation. - * @param retval Return value required flag. * @param subjId Subject ID. * @param taskNameHash Task name hash code. - * @param skipStore Skip write-through to a persistent storage. - * @param keepBinary Keep binary flag. + * @param flags Flags. * @param addDepInfo Deployment info flag. */ protected GridNearAtomicAbstractSingleUpdateRequest( @@ -66,32 +63,22 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, - boolean topLocked, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, - boolean retval, @Nullable UUID subjId, int taskNameHash, - boolean mappingKnown, - boolean skipStore, - boolean keepBinary, - boolean recovery, + byte flags, boolean addDepInfo ) { super(cacheId, nodeId, futId, topVer, - topLocked, syncMode, op, - retval, subjId, taskNameHash, - mappingKnown, - skipStore, - keepBinary, - recovery, + flags, addDepInfo); } http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 6969971..1bd8ec5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -20,9 +20,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -473,17 +474,41 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture /** * */ + static class NodeResult { + /** */ + boolean rcvd; + + /** + * @param rcvd Result received flag. + */ + NodeResult(boolean rcvd) { + this.rcvd = rcvd; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "Result [rcvd=" + rcvd + ']'; + } + } + + /** + * + */ static class PrimaryRequestState { /** */ final GridNearAtomicAbstractUpdateRequest req; /** */ @GridToStringInclude - Set<UUID> dhtNodes; + Map<UUID, NodeResult> mappedNodes; + + /** */ + @GridToStringInclude + private int expCnt = -1; /** */ @GridToStringInclude - private Set<UUID> rcvd; + private int rcvdCnt; /** */ private boolean hasRes; @@ -501,24 +526,39 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture if (req.initMappingLocally()) { if (single) { if (nodes.size() > 1) { - dhtNodes = U.newHashSet(nodes.size() - 1); + mappedNodes = U.newHashMap(nodes.size() - 1); for (int i = 1; i < nodes.size(); i++) - dhtNodes.add(nodes.get(i).id()); + mappedNodes.put(nodes.get(i).id(), new NodeResult(false)); } else - dhtNodes = Collections.emptySet(); + mappedNodes = Collections.emptyMap(); } else { - dhtNodes = new HashSet<>(); + mappedNodes = new HashMap<>(); for (int i = 1; i < nodes.size(); i++) - dhtNodes.add(nodes.get(i).id()); + mappedNodes.put(nodes.get(i).id(), new NodeResult(false)); } + + expCnt = mappedNodes.size(); } } /** + * + */ + void resetLocalMapping() { + assert req.initMappingLocally() : req; + + mappedNodes = null; + + expCnt = -1; + + req.needPrimaryResponse(true); + } + + /** * @return Primary node ID. */ UUID primaryId() { @@ -532,7 +572,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture assert req.initMappingLocally(); for (int i = 1; i < nodes.size(); i++) - dhtNodes.add(nodes.get(i).id()); + mappedNodes.put(nodes.get(i).id(), new NodeResult(false)); + + expCnt = mappedNodes.size(); } /** @@ -547,11 +589,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture boolean finished = false; - for (Iterator<UUID> it = dhtNodes.iterator(); it.hasNext();) { - UUID nodeId = it.next(); + for (Map.Entry<UUID, NodeResult> e : mappedNodes.entrySet()) { + NodeResult res = e.getValue(); + + if (res.rcvd) + continue; + + UUID nodeId = e.getKey(); if (!cctx.discovery().alive(nodeId)) { - it.remove(); + res.rcvd = true; + + rcvdCnt++; if (finished()) { finished = true; @@ -564,7 +613,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture if (finished) return DhtLeftResult.DONE; - if (dhtNodes.isEmpty()) + if (rcvdCnt == expCnt) return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE; return DhtLeftResult.NOT_DONE; @@ -577,7 +626,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture if (req.writeSynchronizationMode() == PRIMARY_SYNC) return hasRes; - return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes; + return (expCnt == rcvdCnt) && hasRes; } /** @@ -622,14 +671,23 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture * @return {@code True} if request processing finished. */ DhtLeftResult onDhtNodeLeft(UUID nodeId) { - if (req.writeSynchronizationMode() != FULL_SYNC || dhtNodes == null || finished()) + if (req.writeSynchronizationMode() != FULL_SYNC || mappedNodes == null || finished()) return DhtLeftResult.NOT_DONE; - if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) { - if (hasRes) - return DhtLeftResult.DONE; - else - return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE; + NodeResult res = mappedNodes.get(nodeId); + + if (res != null && !res.rcvd) { + res.rcvd = true; + + rcvdCnt++; + + if (rcvdCnt == expCnt) { + if (hasRes) + return DhtLeftResult.DONE; + else + return !req.needPrimaryResponse() ? + DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE; + } } return DhtLeftResult.NOT_DONE; @@ -649,16 +707,38 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture if (res.hasResult()) hasRes = true; - if (dhtNodes == null) { - if (rcvd == null) - rcvd = new HashSet<>(); + if (mappedNodes == null) { + assert expCnt == -1 : expCnt; + + mappedNodes = new HashMap<>(); + + mappedNodes.put(nodeId, new NodeResult(true)); - rcvd.add(nodeId); + rcvdCnt++; return false; } - return dhtNodes.remove(nodeId) && finished(); + NodeResult nodeRes = mappedNodes.get(nodeId); + + if (nodeRes != null) { + if (nodeRes.rcvd) + return false; + + nodeRes.rcvd = true; + + rcvdCnt++; + } + else { + if (!hasRes) // Do not finish future until primary response received and mapping is known. + expCnt = -1; + + mappedNodes.put(nodeId, new NodeResult(true)); + + rcvdCnt++; + } + + return finished(); } /** @@ -676,15 +756,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture assert onRes; if (res.error() != null || res.remapTopologyVersion() != null) { - dhtNodes = Collections.emptySet(); // Mark as finished. + expCnt = -1; // Mark as finished. return true; } assert res.returnValue() != null : res; - if (res.dhtNodes() != null) - initDhtNodes(res.dhtNodes(), cctx); + if (res.mapping() != null) + initMapping(res.mapping(), cctx); return finished(); } @@ -693,39 +773,61 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture * @param nodeIds Node IDs. * @param cctx Context. */ - private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) { - assert dhtNodes == null || req.initMappingLocally(); + private void initMapping(List<UUID> nodeIds, GridCacheContext cctx) { + assert rcvdCnt <= nodeIds.size(); - Set<UUID> dhtNodes0 = dhtNodes; + expCnt = nodeIds.size(); - dhtNodes = null; + if (mappedNodes == null) + mappedNodes = U.newHashMap(nodeIds.size()); - for (UUID dhtNodeId : nodeIds) { - if (F.contains(rcvd, dhtNodeId)) - continue; + for (int i = 0; i < nodeIds.size(); i++) { + UUID nodeId = nodeIds.get(i); - if (req.initMappingLocally() && !F.contains(dhtNodes0, dhtNodeId)) - continue; + if (!mappedNodes.containsKey(nodeId)) { + NodeResult res = new NodeResult(false); - if (cctx.discovery().node(dhtNodeId) != null) { - if (dhtNodes == null) - dhtNodes = U.newHashSet(nodeIds.size()); + mappedNodes.put(nodeId, res); - dhtNodes.add(dhtNodeId); + if (cctx.discovery().node(nodeId) == null) { + res.rcvd = true; + + rcvdCnt++; + } } } - - if (dhtNodes == null) - dhtNodes = Collections.emptySet(); } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(PrimaryRequestState.class, this, - "primary", primaryId(), - "needPrimaryRes", req.needPrimaryResponse(), - "primaryRes", req.response() != null, - "done", finished()); + Set<UUID> rcvd = null; + Set<UUID> nonRcvd = null; + + if (mappedNodes != null) { + for (Map.Entry<UUID, NodeResult> e : mappedNodes.entrySet()) { + if (e.getValue().rcvd) { + if (rcvd == null) + rcvd = new HashSet<>(); + + rcvd.add(e.getKey()); + } + else { + if (nonRcvd == null) + nonRcvd = new HashSet<>(); + + nonRcvd.add(e.getKey()); + } + } + } + + return "Primary [id=" + primaryId() + + ", opRes=" + hasRes + + ", expCnt=" + expCnt + + ", rcvdCnt=" + rcvdCnt + + ", primaryRes=" + (req.response() != null) + + ", done=" + finished() + + ", waitFor=" + nonRcvd + + ", rcvd=" + rcvd + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index bb47af4..62618f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -66,6 +66,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes /** Recovery value flag. */ private static final int RECOVERY_FLAG_MASK = 0x20; + /** */ + private static final int NEAR_CACHE_FLAG_MASK = 0x40; + + /** */ + private static final int AFFINITY_MAPPING_FLAG_MASK = 0x80; + /** Target node ID. */ @GridDirectTransient protected UUID nodeId; @@ -110,15 +116,11 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes * @param nodeId Node ID. * @param futId Future ID. * @param topVer Topology version. - * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. * @param op Cache update operation. - * @param retval Return value required flag. * @param subjId Subject ID. * @param taskNameHash Task name hash code. - * @param needPrimaryRes {@code True} if near node waits for primary response. - * @param skipStore Skip write-through to a persistent storage. - * @param keepBinary Keep binary flag. + * @param flags Flags. * @param addDepInfo Deployment info flag. */ protected GridNearAtomicAbstractUpdateRequest( @@ -126,16 +128,11 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, - boolean topLocked, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, - boolean retval, @Nullable UUID subjId, int taskNameHash, - boolean needPrimaryRes, - boolean skipStore, - boolean keepBinary, - boolean recovery, + byte flags, boolean addDepInfo ) { this.cacheId = cacheId; @@ -146,20 +143,73 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes this.op = op; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.flags = flags; this.addDepInfo = addDepInfo; + } + + /** + * @param nearCache {@code True} if near cache enabled on originating node. + * @param topLocked Topology locked flag. + * @param retval Return value required flag. + * @param affMapping {@code True} if originating node detected that rebalancing finished and + * expects that update is mapped using current affinity. + * @param needPrimaryRes {@code True} if near node waits for primary response. + * @param skipStore Skip write-through to a CacheStore flag. + * @param keepBinary Keep binary flag. + * @param recovery Recovery mode flag. + * @return Flags. + */ + static byte flags( + boolean nearCache, + boolean topLocked, + boolean retval, + boolean affMapping, + boolean needPrimaryRes, + boolean skipStore, + boolean keepBinary, + boolean recovery) { + byte flags = 0; + + if (nearCache) + flags |= NEAR_CACHE_FLAG_MASK; - if (needPrimaryRes) - needPrimaryResponse(true); if (topLocked) - topologyLocked(true); + flags |= TOP_LOCKED_FLAG_MASK; + if (retval) - returnValue(true); + flags |= RET_VAL_FLAG_MASK; + + if (affMapping) + flags |= AFFINITY_MAPPING_FLAG_MASK; + + if (needPrimaryRes) + flags |= NEED_PRIMARY_RES_FLAG_MASK; + if (skipStore) - skipStore(true); + flags |= SKIP_STORE_FLAG_MASK; + if (keepBinary) - keepBinary(true); + flags |= KEEP_BINARY_FLAG_MASK; + if (recovery) - recovery(true); + flags |= RECOVERY_FLAG_MASK; + + return flags; + } + + /** + * @return {@code True} if originating node detected that rebalancing finished and + * expects that update is mapped using current affinity. + */ + boolean affinityMapping() { + return isFlag(AFFINITY_MAPPING_FLAG_MASK); + } + + /** + * @return {@code True} if near cache is enabled on node initiated operation. + */ + public boolean nearCache() { + return isFlag(NEAR_CACHE_FLAG_MASK); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index f175ab3..4b9aef0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -117,22 +117,19 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat /** * Constructor. - * @param cacheId Cache ID. + * + * @param cacheId Cache ID. * @param nodeId Node ID. * @param futId Future ID. * @param topVer Topology version. - * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. * @param op Cache update operation. - * @param retval Return value required flag. * @param expiryPlc Expiry policy. * @param invokeArgs Optional arguments for entry processor. * @param filter Optional filter for atomic check. * @param subjId Subject ID. * @param taskNameHash Task name hash code. - * @param needPrimaryRes {@code True} if client expects primary node response. - * @param skipStore Skip write-through to a persistent storage. - * @param keepBinary Keep binary flag. + * @param flags Flags. * @param addDepInfo Deployment info flag. * @param maxEntryCnt Maximum entries count. */ @@ -141,19 +138,14 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, - boolean topLocked, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, - boolean retval, @Nullable ExpiryPolicy expiryPlc, @Nullable Object[] invokeArgs, @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash, - boolean needPrimaryRes, - boolean skipStore, - boolean keepBinary, - boolean recovery, + byte flags, boolean addDepInfo, int maxEntryCnt ) { @@ -161,17 +153,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat nodeId, futId, topVer, - topLocked, syncMode, op, - retval, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, addDepInfo); + this.expiryPlc = expiryPlc; this.invokeArgs = invokeArgs; this.filter = filter; http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java index f5bd889..5c66bc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java @@ -55,20 +55,17 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl /** * Constructor. - * @param cacheId Cache ID. + * + * @param cacheId Cache ID. * @param nodeId Node ID. * @param futId Future ID. * @param topVer Topology version. - * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. * @param op Cache update operation. - * @param retval Return value required flag. * @param filter Optional filter for atomic check. * @param subjId Subject ID. * @param taskNameHash Task name hash code. - * @param needPrimaryRes {@code True} if client expects primary node response. - * @param skipStore Skip write-through to a persistent storage. - * @param keepBinary Keep binary flag. + * @param flags Flags. * @param addDepInfo Deployment info flag. */ GridNearAtomicSingleUpdateFilterRequest( @@ -76,17 +73,12 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, - boolean topLocked, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, - boolean retval, @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash, - boolean needPrimaryRes, - boolean skipStore, - boolean keepBinary, - boolean recovery, + byte flags, boolean addDepInfo ) { super( @@ -94,16 +86,11 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl nodeId, futId, topVer, - topLocked, syncMode, op, - retval, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, addDepInfo ); http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 6ffa373..60d94b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -547,8 +547,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda else val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val); - boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) && - !cctx.discovery().hasNearCache(cctx.cacheId(), topVer); + boolean mappingKnown = cctx.topology().rebalanceFinished(topVer); List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer); @@ -558,10 +557,19 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda ClusterNode primary = nodes.get(0); - boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1; + boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1 || nearEnabled; GridNearAtomicAbstractUpdateRequest req; + byte flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled, + topLocked, + retval, + mappingKnown, + needPrimaryRes, + skipStore, + keepBinary, + recovery); + if (canUseSingleRequest()) { if (op == TRANSFORM) { req = new GridNearAtomicSingleUpdateInvokeRequest( @@ -569,17 +577,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda primary.id(), futId, topVer, - topLocked, syncMode, op, - retval, invokeArgs, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, cctx.deploymentEnabled()); } else { @@ -589,16 +592,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda primary.id(), futId, topVer, - topLocked, syncMode, op, - retval, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, cctx.deploymentEnabled()); } else { @@ -607,17 +605,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda primary.id(), futId, topVer, - topLocked, syncMode, op, - retval, filter, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, cctx.deploymentEnabled()); } } @@ -628,19 +621,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda primary.id(), futId, topVer, - topLocked, syncMode, op, - retval, expiryPlc, invokeArgs, filter, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, cctx.deploymentEnabled(), 1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java index 850a0f9..b1fb530 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java @@ -73,20 +73,17 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl /** * Constructor. - * @param cacheId Cache ID. + * + * @param cacheId Cache ID. * @param nodeId Node ID. * @param futId Future ID. * @param topVer Topology version. - * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. * @param op Cache update operation. - * @param retval Return value required flag. * @param invokeArgs Optional arguments for entry processor. * @param subjId Subject ID. * @param taskNameHash Task name hash code. - * @param needPrimaryRes {@code True} if client expects primary node response. - * @param skipStore Skip write-through to a persistent storage. - * @param keepBinary Keep binary flag. + * @param flags Flags. * @param addDepInfo Deployment info flag. */ GridNearAtomicSingleUpdateInvokeRequest( @@ -94,17 +91,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, - boolean topLocked, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, - boolean retval, @Nullable Object[] invokeArgs, @Nullable UUID subjId, int taskNameHash, - boolean needPrimaryRes, - boolean skipStore, - boolean keepBinary, - boolean recovery, + byte flags, boolean addDepInfo ) { super( @@ -112,16 +104,11 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl nodeId, futId, topVer, - topLocked, syncMode, op, - retval, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, addDepInfo ); http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 78e0f5d..dd3a7be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -71,15 +71,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin * @param nodeId Node ID. * @param futId Future ID. * @param topVer Topology version. - * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. * @param op Cache update operation. - * @param retval Return value required flag. * @param subjId Subject ID. * @param taskNameHash Task name hash code. - * @param needPrimaryRes {@code True} if client expects primary node response. - * @param skipStore Skip write-through to a persistent storage. - * @param keepBinary Keep binary flag. + * @param flags Flags. * @param addDepInfo Deployment info flag. */ GridNearAtomicSingleUpdateRequest( @@ -87,32 +83,22 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, - boolean topLocked, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, - boolean retval, @Nullable UUID subjId, int taskNameHash, - boolean needPrimaryRes, - boolean skipStore, - boolean keepBinary, - boolean recovery, + byte flags, boolean addDepInfo ) { super(cacheId, nodeId, futId, topVer, - topLocked, syncMode, op, - retval, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, addDepInfo ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 46a3c34..190ed3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -684,11 +684,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } else { try { - if (req.initMappingLocally() && reqState.dhtNodes.isEmpty()) { - reqState.dhtNodes = null; - - req.needPrimaryResponse(true); - } + if (req.initMappingLocally() && reqState.mappedNodes.isEmpty()) + reqState.resetLocalMapping(); cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); @@ -753,8 +750,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu int size = keys.size(); - boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) && - !cctx.discovery().hasNearCache(cctx.cacheId(), topVer); + boolean mappingKnown = cctx.topology().rebalanceFinished(topVer); try { if (size == 1) { @@ -1001,31 +997,35 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu ClusterNode primary = nodes.get(0); - boolean needPrimaryRes = !mappingKnown || primary.isLocal(); + boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nearEnabled; UUID nodeId = primary.id(); PrimaryRequestState mapped = pendingMappings.get(nodeId); if (mapped == null) { + byte flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled, + topLocked, + retval, + mappingKnown, + needPrimaryRes, + skipStore, + keepBinary, + recovery); + GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest( cctx.cacheId(), nodeId, futId, topVer, - topLocked, syncMode, op, - retval, expiryPlc, invokeArgs, filter, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, cctx.deploymentEnabled(), keys.size()); @@ -1112,26 +1112,30 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu ClusterNode primary = nodes.get(0); - boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1; + boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1 || nearEnabled; + + byte flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled, + topLocked, + retval, + mappingKnown, + needPrimaryRes, + skipStore, + keepBinary, + recovery); GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest( cctx.cacheId(), primary.id(), futId, topVer, - topLocked, syncMode, op, - retval, expiryPlc, invokeArgs, filter, subjId, taskNameHash, - needPrimaryRes, - skipStore, - keepBinary, - recovery, + flags, cctx.deploymentEnabled(), 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 5ba024f..9492164 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -78,7 +78,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements /** */ @GridDirectCollection(UUID.class) @GridToStringInclude - private List<UUID> dhtNodes; + private List<UUID> mapping; /** */ @GridDirectTransient @@ -126,17 +126,17 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements } /** - * @param dhtNodes DHT nodes. + * @param mapping Mapping. */ - public void dhtNodes(List<UUID> dhtNodes) { - this.dhtNodes = dhtNodes; + public void mapping(List<UUID> mapping) { + this.mapping = mapping; } /** * @return DHT nodes. */ - @Nullable public List<UUID> dhtNodes() { - return dhtNodes; + @Nullable public List<UUID> mapping() { + return mapping; } /** @@ -406,19 +406,19 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements switch (writer.state()) { case 3: - if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID)) + if (!writer.writeMessage("errs", errs)) return false; writer.incrementState(); case 4: - if (!writer.writeMessage("errs", errs)) + if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); case 5: - if (!writer.writeLong("futId", futId)) + if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID)) return false; writer.incrementState(); @@ -464,7 +464,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements switch (reader.state()) { case 3: - dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID); + errs = reader.readMessage("errs"); if (!reader.isLastRead()) return false; @@ -472,7 +472,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements reader.incrementState(); case 4: - errs = reader.readMessage("errs"); + futId = reader.readLong("futId"); if (!reader.isLastRead()) return false; @@ -480,7 +480,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements reader.incrementState(); case 5: - futId = reader.readLong("futId"); + mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index 749ebe8..0bfb4fb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; @@ -747,6 +748,133 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testPutReaderUpdate1() throws Exception { + readerUpdateDhtFails(false, false, false); + + stopAllGrids(); + + readerUpdateDhtFails(false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPutReaderUpdate2() throws Exception { + readerUpdateDhtFails(true, false, false); + + stopAllGrids(); + + readerUpdateDhtFails(true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllReaderUpdate1() throws Exception { + readerUpdateDhtFails(false, false, true); + + stopAllGrids(); + + readerUpdateDhtFails(false, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllReaderUpdate2() throws Exception { + readerUpdateDhtFails(true, false, true); + + stopAllGrids(); + + readerUpdateDhtFails(true, true, true); + } + + /** + * @param updateNearEnabled {@code True} if enable near cache for second put. + * @param delayReader If {@code true} delay reader response, otherwise delay backup response. + * @param putAll If {@code true} use putAll, otherwise put. + * @throws Exception If failed. + */ + private void readerUpdateDhtFails(boolean updateNearEnabled, + boolean delayReader, + boolean putAll) throws Exception { + ccfg = cacheConfiguration(1, FULL_SYNC); + + client = false; + + startServers(2); + + awaitPartitionMapExchange(); + + Ignite srv0 = ignite(0); + Ignite srv1 = ignite(1); + + List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), putAll ? 3 : 1); + + ccfg = null; + + client = true; + + Ignite client1 = startGrid(2); + + IgniteCache<Object, Object> cache1 = client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>()); + + Ignite client2 = startGrid(3); + + IgniteCache<Object, Object> cache2 = updateNearEnabled ? + client2.createNearCache(TEST_CACHE, new NearCacheConfiguration<>()) : client2.cache(TEST_CACHE); + + if (putAll) { + Map<Integer, Integer> map = new HashMap<>(); + + for (Integer key : keys) + map.put(key, 1); + + cache1.putAll(map); + } + else + cache1.put(keys.get(0), 1); + + if (delayReader) + testSpi(client1).blockMessages(GridDhtAtomicNearResponse.class, client2.name()); + else + testSpi(srv1).blockMessages(GridDhtAtomicNearResponse.class, client2.name()); + + Map<Integer, Integer> map; + + IgniteFuture<?> fut; + + if (putAll) { + map = new HashMap<>(); + + for (Integer key : keys) + map.put(key, 1); + + fut = cache2.putAllAsync(map); + } + else { + map = F.asMap(keys.get(0), 2); + + fut = cache2.putAsync(keys.get(0), 2); + } + + U.sleep(2000); + + assertFalse(fut.isDone()); + + if (delayReader) + testSpi(client1).stopBlock(); + else + testSpi(srv1).stopBlock(); + + fut.get(); + + checkData(map); + } + + /** * @param expData Expected cache data. */ private void checkData(Map<Integer, Integer> expData) {
