Repository: ignite Updated Branches: refs/heads/ignite-5272 f34672efb -> 1ef802ed4
ignite-5272 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1ef802ed Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1ef802ed Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1ef802ed Branch: refs/heads/ignite-5272 Commit: 1ef802ed458152d550388f8fc66a1df83fd30951 Parents: f34672e Author: sboikov <[email protected]> Authored: Wed Jun 7 22:42:42 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Jun 7 23:18:04 2017 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtCacheAdapter.java | 20 +-- .../distributed/dht/GridDhtCacheEntry.java | 10 +- .../cache/distributed/dht/GridDhtGetFuture.java | 16 ++- .../distributed/dht/GridDhtGetSingleFuture.java | 10 +- .../cache/distributed/dht/GridDhtTxLocal.java | 2 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 3 +- .../dht/GridPartitionedGetFuture.java | 2 + .../dht/GridPartitionedSingleGetFuture.java | 7 +- .../dht/atomic/GridDhtAtomicCache.java | 2 +- ...idNearAtomicAbstractSingleUpdateRequest.java | 2 + .../GridNearAtomicAbstractUpdateRequest.java | 8 +- .../atomic/GridNearAtomicFullUpdateRequest.java | 2 + ...GridNearAtomicSingleUpdateFilterRequest.java | 2 + .../GridNearAtomicSingleUpdateFuture.java | 4 + ...GridNearAtomicSingleUpdateInvokeRequest.java | 2 + .../GridNearAtomicSingleUpdateRequest.java | 2 + .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 + .../distributed/near/GridNearGetFuture.java | 4 +- .../distributed/near/GridNearGetRequest.java | 144 ++++++++----------- .../cache/distributed/near/GridNearTxLocal.java | 15 +- .../cache/transactions/IgniteTxEntry.java | 14 +- .../transactions/IgniteTxLocalAdapter.java | 6 +- 22 files changed, 144 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 418d712..6427140 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -726,15 +726,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap return sum; } - /** - * This method is used internally. Use - * {@link #getDhtAsync(UUID, long, Map, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean, boolean)} - * method instead to retrieve DHT value. - * @param keys {@inheritDoc} - * @param forcePrimary {@inheritDoc} - * @param skipTx {@inheritDoc} - * @param needVer Need version. @return {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( @Nullable Collection<? extends K> keys, boolean forcePrimary, @@ -816,6 +808,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader, long msgId, Map<KeyCacheObject, Boolean> keys, + boolean addReaders, boolean readThrough, AffinityTopologyVersion topVer, @Nullable UUID subjId, @@ -834,7 +827,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap taskNameHash, expiry, skipVals, - recovery); + recovery, + addReaders); fut.init(); @@ -854,7 +848,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param skipVals Skip vals flag. * @return Future for the operation. */ - private IgniteInternalFuture<GridCacheEntryInfo> getDhtSingleAsync( + public GridDhtGetSingleFuture getDhtSingleAsync( UUID nodeId, long msgId, KeyCacheObject key, @@ -867,7 +861,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap boolean skipVals, boolean recovery ) { - GridDhtGetSingleFuture<K, V> fut = new GridDhtGetSingleFuture<>( + GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture<>( ctx, msgId, nodeId, @@ -992,7 +986,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap */ protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) { assert ctx.affinityNode(); - assert !req.reload() : req; final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl()); @@ -1000,6 +993,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap getDhtAsync(nodeId, req.messageId(), req.keys(), + req.addReaders(), req.readThrough(), req.topologyVersion(), req.subjectId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 2e86fb0..0265474 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -407,14 +407,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { return null; } - // If remote node has no near cache, don't add it. - if (!cctx.discovery().cacheNearNode(node, cacheName())) { - if (log.isDebugEnabled()) - log.debug("Ignoring near reader because near cache is disabled: " + nodeId); - - return null; - } - // If remote node is (primary?) or back up, don't add it as a reader. if (cctx.affinity().partitionBelongs(node, partition(), topVer)) { if (log.isDebugEnabled()) @@ -644,7 +636,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { for (int i = 0; i < rdrs.length; i++) { ClusterNode node = cctx.discovery().getAlive(rdrs[i].nodeId()); - if (node == null || !cctx.discovery().cacheNode(node, cacheName())) { + if (node == null) { // Node has left and if new list has already been created, just skip. // Otherwise, create new list and add alive nodes. if (newRdrs == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 8031c8f..29bf3a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -112,6 +112,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /** */ private final boolean recovery; + /** */ + private final boolean addReaders; + /** * @param cctx Context. * @param msgId Message ID. @@ -135,7 +138,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col int taskNameHash, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean recovery + boolean recovery, + boolean addReaders ) { super(CU.<GridCacheEntryInfo>collectionsReducer(keys.size())); @@ -153,6 +157,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col this.expiryPlc = expiryPlc; this.skipVals = skipVals; this.recovery = recovery; + this.addReaders = addReaders; futId = IgniteUuid.randomUuid(); @@ -346,12 +351,13 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col GridCompoundFuture<Boolean, Boolean> txFut = null; - ClusterNode readerNode = cctx.discovery().node(reader); - ReaderArguments readerArgs = null; - if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) { + if (addReaders && !skipVals && !cctx.localNodeId().equals(reader)) { for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) { + if (!k.getValue()) + continue; + while (true) { GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer); @@ -359,7 +365,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (e.obsolete()) continue; - boolean addReader = (!e.deleted() && k.getValue() && !skipVals); + boolean addReader = !e.deleted(); if (addReader) { e.unswap(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index b9007ba..095fe77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -73,7 +73,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa private KeyCacheObject key; /** */ - private boolean addRdr; + private final boolean addRdr; /** Reserved partitions. */ private int part = -1; @@ -123,7 +123,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa long msgId, UUID reader, KeyCacheObject key, - Boolean addRdr, + boolean addRdr, boolean readThrough, @NotNull AffinityTopologyVersion topVer, @Nullable UUID subjId, @@ -303,11 +303,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa IgniteInternalFuture<Boolean> rdrFut = null; - ClusterNode readerNode = cctx.discovery().node(reader); - ReaderArguments readerArgs = null; - if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) { + if (addRdr && !skipVals && !cctx.localNodeId().equals(reader)) { while (true) { GridDhtCacheEntry e = cache().entryExx(key, topVer); @@ -315,7 +313,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa if (e.obsolete()) continue; - boolean addReader = (!e.deleted() && this.addRdr && !skipVals); + boolean addReader = !e.deleted(); if (addReader) { e.unswap(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 26f08fa..e1bc313 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -265,7 +265,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry cached, IgniteTxEntry entry, AffinityTopologyVersion topVer) { // Don't add local node as reader. - if (!cctx.localNodeId().equals(nearNodeId)) { + if (entry.addReader() && !cctx.localNodeId().equals(nearNodeId)) { GridCacheContext cacheCtx = cached.context(); while (true) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 8b51cb5..8047bda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -612,7 +612,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { -1L, null, skipStore, - keepBinary); + keepBinary, + CU.isNearEnabled(cacheCtx)); if (read) txEntry.ttl(accessTtl); http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 5543cec..8db0ea1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -292,6 +292,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda cache().getDhtAsync(n.id(), -1, mappedKeys, + false, readThrough, topVer, subjId, @@ -351,6 +352,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda taskName == null ? 0 : taskName.hashCode(), expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, + false, skipVals, cctx.deploymentEnabled(), recovery); http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 3f612f7..a6883e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -221,11 +221,10 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec return; if (node.isLocal()) { - Map<KeyCacheObject, Boolean> map = Collections.singletonMap(key, false); - - final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cctx.dht().getDhtAsync(node.id(), + final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cctx.dht().getDhtSingleAsync(node.id(), -1, - map, + key, + false, readThrough, topVer, subjId, http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 047aa93..873f4bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1796,7 +1796,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { locked = lockEntries(req, req.topologyVersion()); - boolean hasNear = req.nearEnabled(); + boolean hasNear = req.nearCache(); // Assign next version for update inside entries lock. GridCacheVersion ver = ctx.versions().next(top.topologyVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java index 9a6909a..97c973f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java @@ -63,6 +63,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear */ protected GridNearAtomicAbstractSingleUpdateRequest( int cacheId, + boolean nearCache, UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, @@ -79,6 +80,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear boolean addDepInfo ) { super(cacheId, + nearCache, nodeId, futId, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index 3a66c4d..5566e94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -126,6 +126,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes */ protected GridNearAtomicAbstractUpdateRequest( int cacheId, + boolean nearCache, UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, @@ -163,9 +164,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes keepBinary(true); if (recovery) recovery(true); + if (nearCache) + setFlag(true, NEAR_CACHE_FLAG_MASK); } - public boolean nearEnabled() { + /** + * @return {@code True} if near cache is enabled on node initiated operation. + */ + public boolean nearCache() { return isFlag(NEAR_CACHE_FLAG_MASK); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index f175ab3..28319a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -138,6 +138,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat */ GridNearAtomicFullUpdateRequest( int cacheId, + boolean nearCache, UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, @@ -158,6 +159,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat int maxEntryCnt ) { super(cacheId, + nearCache, nodeId, futId, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java index f5bd889..31e6c5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java @@ -73,6 +73,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl */ GridNearAtomicSingleUpdateFilterRequest( int cacheId, + boolean nearCache, UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, @@ -91,6 +92,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl ) { super( cacheId, + nearCache, nodeId, futId, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 6ffa373..2f8473d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -566,6 +566,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (op == TRANSFORM) { req = new GridNearAtomicSingleUpdateInvokeRequest( cctx.cacheId(), + nearEnabled, primary.id(), futId, topVer, @@ -586,6 +587,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (filter == null || filter.length == 0) { req = new GridNearAtomicSingleUpdateRequest( cctx.cacheId(), + nearEnabled, primary.id(), futId, topVer, @@ -604,6 +606,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda else { req = new GridNearAtomicSingleUpdateFilterRequest( cctx.cacheId(), + nearEnabled, primary.id(), futId, topVer, @@ -625,6 +628,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda else { req = new GridNearAtomicFullUpdateRequest( cctx.cacheId(), + nearEnabled, primary.id(), futId, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java index 850a0f9..639e4d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java @@ -91,6 +91,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl */ GridNearAtomicSingleUpdateInvokeRequest( int cacheId, + boolean nearCache, UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, @@ -109,6 +110,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl ) { super( cacheId, + nearCache, nodeId, futId, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 78e0f5d..1b858d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -84,6 +84,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin */ GridNearAtomicSingleUpdateRequest( int cacheId, + boolean nearCache, UUID nodeId, long futId, @NotNull AffinityTopologyVersion topVer, @@ -100,6 +101,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin boolean addDepInfo ) { super(cacheId, + nearCache, nodeId, futId, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 46a3c34..b299db0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -1010,6 +1010,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (mapped == null) { GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest( cctx.cacheId(), + nearEnabled, nodeId, futId, topVer, @@ -1116,6 +1117,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest( cctx.cacheId(), + nearEnabled, primary.id(), futId, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index c7c6e9e..d4769a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -314,6 +314,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap dht().getDhtAsync(n.id(), -1, mappedKeys, + false, readThrough, topVer, subjId, @@ -379,6 +380,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap taskName == null ? 0 : taskName.hashCode(), expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, + true, skipVals, cctx.deploymentEnabled(), recovery); @@ -445,7 +447,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap EntryGetResult res = entry.innerGetVersioned( null, null, - /**update-metrics*/true, + /*update-metrics*/true, /*event*/!skipVals, subjId, null, http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 6092511..2ed46f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -55,6 +55,18 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD /** */ private static final long serialVersionUID = 0L; + /** */ + private static final int READ_THROUGH_FLAG_MASK = 0x01; + + /** */ + private static final int SKIP_VALS_FLAG_MASK = 0x02; + + /** */ + private static final int ADD_READER_FLAG_MASK = 0x04; + + /** */ + public static final int RECOVERY_FLAG_MASK = 0x08; + /** Future ID. */ private IgniteUuid futId; @@ -75,19 +87,10 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD /** */ @GridDirectCollection(boolean.class) - private Collection<Boolean> flags; - - /** Reload flag. */ - private boolean reload; + private List<Boolean> readersFlags; - /** Read through flag. */ - private boolean readThrough; - - /** Skip values flag. Used for {@code containsKey} method. */ - private boolean skipVals; - - /** Recovery flag. */ - private boolean recovery; + /** */ + private byte flags; /** Topology version. */ private AffinityTopologyVersion topVer; @@ -139,6 +142,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD int taskNameHash, long createTtl, long accessTtl, + boolean addReader, boolean skipVals, boolean addDepInfo, boolean recovery @@ -153,22 +157,35 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD this.ver = ver; this.keys = new ArrayList<>(keys.size()); - flags = new ArrayList<>(keys.size()); + + if (addReader) + readersFlags = new ArrayList<>(keys.size()); for (Map.Entry<KeyCacheObject, Boolean> entry : keys.entrySet()) { this.keys.add(entry.getKey()); - flags.add(entry.getValue()); + + if (addReader) + readersFlags.add(entry.getValue()); } - this.readThrough = readThrough; this.topVer = topVer; this.subjId = subjId; this.taskNameHash = taskNameHash; this.createTtl = createTtl; this.accessTtl = accessTtl; - this.skipVals = skipVals; this.addDepInfo = addDepInfo; - this.recovery = recovery; + + if (readThrough) + flags |= READ_THROUGH_FLAG_MASK; + + if (skipVals) + flags |= SKIP_VALS_FLAG_MASK; + + if (addReader) + flags |= ADD_READER_FLAG_MASK; + + if (recovery) + flags |= RECOVERY_FLAG_MASK; } /** @@ -214,17 +231,10 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD } /** - * @return Reload flag. - */ - public boolean reload() { - return reload; - } - - /** * @return Read through flag. */ public boolean readThrough() { - return readThrough; + return (flags & READ_THROUGH_FLAG_MASK) != 0; } /** @@ -232,14 +242,18 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD * returned as future result. */ public boolean skipValues() { - return skipVals; + return (flags & SKIP_VALS_FLAG_MASK) != 0; } /** * @return Recovery flag. */ public boolean recovery() { - return recovery; + return (flags & RECOVERY_FLAG_MASK) != 0; + } + + public boolean addReaders() { + return (flags & ADD_READER_FLAG_MASK) != 0; } /** @@ -277,7 +291,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD assert ctx != null; assert !F.isEmpty(keys); - assert keys.size() == flags.size(); + assert readersFlags == null || keys.size() == readersFlags.size(); GridCacheContext cctx = ctx.cacheContext(cacheId); @@ -297,16 +311,18 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD finishUnmarshalCacheObjects(keys, cctx, ldr); assert !F.isEmpty(keys); - assert keys.size() == flags.size(); + assert readersFlags == null || keys.size() == readersFlags.size(); if (keyMap == null) { keyMap = U.newLinkedHashMap(keys.size()); Iterator<KeyCacheObject> keysIt = keys.iterator(); - Iterator<Boolean> flagsIt = flags.iterator(); - while (keysIt.hasNext()) - keyMap.put(keysIt.next(), flagsIt.next()); + for (int i = 0; i < keys.size(); i++) { + Boolean addRdr = readersFlags != null ? readersFlags.get(i) : Boolean.FALSE; + + keyMap.put(keysIt.next(), addRdr); + } } } @@ -343,7 +359,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD writer.incrementState(); case 5: - if (!writer.writeCollection("flags", flags, MessageCollectionItemType.BOOLEAN)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); @@ -367,48 +383,30 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD writer.incrementState(); case 9: - if (!writer.writeBoolean("readThrough", readThrough)) + if (!writer.writeCollection("readersFlags", readersFlags, MessageCollectionItemType.BOOLEAN)) return false; writer.incrementState(); case 10: - if (!writer.writeBoolean("recovery", recovery)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeBoolean("reload", reload)) - return false; - - writer.incrementState(); - - case 12: - if (!writer.writeBoolean("skipVals", skipVals)) - return false; - - writer.incrementState(); - - case 13: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 14: + case 11: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 15: + case 12: if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); - case 16: + case 13: if (!writer.writeMessage("ver", ver)) return false; @@ -447,7 +445,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD reader.incrementState(); case 5: - flags = reader.readCollection("flags", MessageCollectionItemType.BOOLEAN); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -479,7 +477,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD reader.incrementState(); case 9: - readThrough = reader.readBoolean("readThrough"); + readersFlags = reader.readCollection("readersFlags", MessageCollectionItemType.BOOLEAN); if (!reader.isLastRead()) return false; @@ -487,30 +485,6 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD reader.incrementState(); case 10: - recovery = reader.readBoolean("recovery"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - reload = reader.readBoolean("reload"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: - skipVals = reader.readBoolean("skipVals"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -518,7 +492,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD reader.incrementState(); - case 14: + case 11: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -526,7 +500,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD reader.incrementState(); - case 15: + case 12: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -534,7 +508,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD reader.incrementState(); - case 16: + case 13: ver = reader.readMessage("ver"); if (!reader.isLastRead()) @@ -554,7 +528,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 17; + return 14; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 8c10e53..55905b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -1217,7 +1217,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea drExpireTime, drVer, skipStore, - keepBinary); + keepBinary, + CU.isNearEnabled(cacheCtx)); } else { txEntry = addEntry(READ, @@ -1232,7 +1233,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea -1L, null, skipStore, - keepBinary); + keepBinary, + CU.isNearEnabled(cacheCtx)); } txEntry.markValid(); @@ -1262,7 +1264,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea drExpireTime, drVer, skipStore, - keepBinary); + keepBinary, + CU.isNearEnabled(cacheCtx)); if (enlisted != null) enlisted.add(cacheKey); @@ -1365,7 +1368,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea drExpireTime, drVer, skipStore, - keepBinary); + keepBinary, + CU.isNearEnabled(cacheCtx)); if (enlisted != null) enlisted.add(cacheKey); @@ -2201,7 +2205,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea -1L, null, skipStore, - !deserializeBinary); + !deserializeBinary, + CU.isNearEnabled(cacheCtx)); // As optimization, mark as checked immediately // for non-pessimistic if value is not null. http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 30aa335..095d5ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -88,6 +88,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Flag indicating that old value for 'invoke' operation was non null on primary node. */ private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04; + private static final int TX_ENTRY_ADD_READER_FLAG_MASK = 0x08; + /** Prepared flag updater. */ private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD = AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared"); @@ -287,7 +289,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { CacheEntryPredicate[] filters, GridCacheVersion conflictVer, boolean skipStore, - boolean keepBinary + boolean keepBinary, + boolean addReader ) { assert ctx != null; assert tx != null; @@ -304,6 +307,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { skipStore(skipStore); keepBinary(keepBinary); + addReader(addReader); if (entryProcessor != null) addEntryProcessor(entryProcessor, invokeArgs); @@ -523,6 +527,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { return isFlag(TX_ENTRY_KEEP_BINARY_FLAG_MASK); } + public void addReader(boolean addReader) { + setFlag(addReader, TX_ENTRY_ADD_READER_FLAG_MASK); + } + + public boolean addReader() { + return isFlag(TX_ENTRY_ADD_READER_FLAG_MASK); + } + /** * Sets flag mask. * http://git-wip-us.apache.org/repos/asf/ignite/blob/1ef802ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 52a0f56..e4b850d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1279,7 +1279,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig long drExpireTime, @Nullable GridCacheVersion drVer, boolean skipStore, - boolean keepBinary + boolean keepBinary, + boolean addReader ) { assert invokeArgs == null || op == TRANSFORM; @@ -1355,7 +1356,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig filter, drVer, skipStore, - keepBinary); + keepBinary, + addReader); txEntry.conflictExpireTime(drExpireTime);
