Optimization for single key cache 'get' operation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f103067 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f103067 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f103067 Branch: refs/heads/ignite-801 Commit: 1f1030670a6e7f9fbad1d939301c884f29b7885a Parents: ba1d563 Author: sboikov <[email protected]> Authored: Thu Nov 19 17:29:04 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Nov 19 17:29:04 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 12 + .../processors/cache/GridCacheAdapter.java | 103 ++- .../processors/cache/GridCacheAtomicFuture.java | 6 + .../processors/cache/GridCacheFuture.java | 13 - .../processors/cache/GridCacheIoManager.java | 50 +- .../processors/cache/GridCacheMessage.java | 20 +- .../processors/cache/GridCacheMvccFuture.java | 7 + .../processors/cache/GridCacheMvccManager.java | 108 +-- .../distributed/GridCacheTxRecoveryFuture.java | 13 +- .../dht/CacheDistributedGetFutureAdapter.java | 27 +- .../cache/distributed/dht/CacheGetFuture.java | 32 + .../distributed/dht/GridDhtCacheAdapter.java | 141 ++++ .../distributed/dht/GridDhtLockFuture.java | 16 +- .../dht/GridDhtTransactionalCacheAdapter.java | 9 +- .../distributed/dht/GridDhtTxFinishFuture.java | 24 +- .../cache/distributed/dht/GridDhtTxLocal.java | 4 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 17 +- .../dht/GridPartitionedGetFuture.java | 69 +- .../dht/GridPartitionedSingleGetFuture.java | 697 +++++++++++++++++++ .../dht/atomic/GridDhtAtomicCache.java | 127 +++- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 84 +-- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 11 + .../dht/atomic/GridNearAtomicUpdateFuture.java | 5 - .../dht/colocated/GridDhtColocatedCache.java | 160 ++++- .../colocated/GridDhtColocatedLockFuture.java | 26 +- .../distributed/near/CacheVersionedValue.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 4 +- .../distributed/near/GridNearGetFuture.java | 57 +- .../distributed/near/GridNearGetRequest.java | 1 - .../distributed/near/GridNearGetResponse.java | 2 - .../distributed/near/GridNearLockFuture.java | 16 +- ...arOptimisticSerializableTxPrepareFuture.java | 17 +- .../near/GridNearOptimisticTxPrepareFuture.java | 19 +- .../GridNearPessimisticTxPrepareFuture.java | 19 +- .../near/GridNearSingleGetRequest.java | 396 +++++++++++ .../near/GridNearSingleGetResponse.java | 321 +++++++++ .../near/GridNearTransactionalCache.java | 2 +- .../near/GridNearTxFinishFuture.java | 24 +- .../cache/distributed/near/GridNearTxLocal.java | 149 ++-- .../near/GridNearTxPrepareFutureAdapter.java | 6 +- .../processors/cache/local/GridLocalCache.java | 4 +- .../cache/local/GridLocalLockFuture.java | 5 - .../cache/transactions/IgniteTxHandler.java | 19 +- .../transactions/IgniteTxLocalAdapter.java | 2 +- .../cache/transactions/IgniteTxManager.java | 2 +- .../IgniteClientReconnectCacheTest.java | 11 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 75 ++ .../GridCacheConcurrentTxMultiNodeTest.java | 15 - .../cache/GridCachePartitionedGetSelfTest.java | 3 +- .../IgniteCacheAbstractStopBusySelfTest.java | 27 +- .../IgniteCacheP2pUnmarshallingErrorTest.java | 184 +++-- .../CacheGetFutureHangsSelfTest.java | 6 + .../GridCacheAbstractNodeRestartSelfTest.java | 2 + .../IgniteCacheSingleGetMessageTest.java | 357 ++++++++++ .../GridCacheReplicatedMetricsSelfTest.java | 9 - .../testsuites/IgniteCacheTestSuite4.java | 3 + 56 files changed, 2908 insertions(+), 632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 2503eda..3548aac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -83,6 +83,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; @@ -696,6 +698,16 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 116: + msg = new GridNearSingleGetRequest(); + + break; + + case 117: + msg = new GridNearSingleGetResponse(); + + break; + // [-3..114] - this // [120..123] - DR // [-4..-22] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index cbb7486..562a0eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -599,11 +599,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> containsKeyAsync(K key) { + @Override public final IgniteInternalFuture<Boolean> containsKeyAsync(K key) { A.notNull(key, "key"); - return getAllAsync( - Collections.singletonList(key), + return (IgniteInternalFuture)getAsync( + key, /*force primary*/false, /*skip tx*/false, /*subj id*/null, @@ -611,15 +611,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*deserialize portable*/false, /*skip values*/true, /*can remap*/true - ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { - @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { - Map<K, V> map = fut.get(); - - assert map.isEmpty() || map.size() == 1 : map.size(); - - return map.isEmpty() ? false : map.values().iterator().next() != null; - } - }); + ); } /** {@inheritDoc} */ @@ -1473,6 +1465,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * @param key Key. + * @param forcePrimary Force primary. + * @param skipTx Skip tx. + * @param subjId Subj Id. + * @param taskName Task name. + * @param deserializePortable Deserialize portable. + * @param skipVals Skip values. + * @param canRemap Can remap flag. + * @return Future for the get operation. + */ + protected IgniteInternalFuture<V> getAsync( + final K key, + boolean forcePrimary, + boolean skipTx, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + final boolean skipVals, + boolean canRemap + ) { + return getAllAsync(Collections.singletonList(key), + forcePrimary, + skipTx, + subjId, + taskName, + deserializePortable, + skipVals, + canRemap).chain( + new CX1<IgniteInternalFuture<Map<K, V>>, V>() { + @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { + Map<K, V> map = e.get(); + + assert map.isEmpty() || map.size() == 1 : map.size(); + + if (skipVals) { + Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map); + + return (V)(val); + } + + return map.get(key); + } + }); + } + + /** * @param keys Keys. * @param forcePrimary Force primary. * @param skipTx Skip tx. @@ -1524,7 +1562,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Future for the get operation. * @see GridCacheAdapter#getAllAsync(Collection) */ - public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys, + public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys, boolean readThrough, boolean checkTx, @Nullable final UUID subjId, @@ -1605,11 +1643,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); + final boolean needEntry = storeEnabled || ctx.isSwapOrOffheapEnabled(); + Map<KeyCacheObject, GridCacheVersion> misses = null; for (KeyCacheObject key : keys) { while (true) { - GridCacheEntryEx entry = entryEx(key); + GridCacheEntryEx entry = needEntry ? entryEx(key) : peekEx(key); + + if (entry == null) { + if (!skipVals && ctx.config().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(false); + + break; + } try { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null, @@ -4389,11 +4436,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ @Nullable public V get(K key, boolean deserializePortable) throws IgniteCheckedException { - Map<K, V> map = getAllAsync(F.asList(key), deserializePortable).get(); - - assert map.isEmpty() || map.size() == 1 : map.size(); - - return map.get(key); + return getAsync(key, deserializePortable).get(); } /** @@ -4409,16 +4452,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return new GridFinishedFuture<>(e); } - return getAllAsync(Collections.singletonList(key), deserializePortable).chain( - new CX1<IgniteInternalFuture<Map<K, V>>, V>() { - @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { - Map<K, V> map = e.get(); - - assert map.isEmpty() || map.size() == 1 : map.size(); + String taskName = ctx.kernalContext().job().currentTaskName(); - return map.get(key); - } - }); + return getAsync(key, + !ctx.config().isReadFromBackup(), + /*skip tx*/false, + null, + taskName, + deserializePortable, + false, + /*can remap*/true); } /** @@ -4445,10 +4488,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return getAllAsync(keys, !ctx.config().isReadFromBackup(), /*skip tx*/false, - null, + /*subject id*/null, taskName, deserializePortable, - false, + /*skip vals*/false, /*can remap*/true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index be35c5c..359909e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -20,12 +20,18 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** * Update future for atomic cache. */ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { /** + * @return Future version. + */ + public GridCacheVersion version(); + + /** * Gets future that will be completed when it is safe when update is finished on the given version of topology. * * @param topVer Topology version to finish. http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java index caa3d3f..8bf8d40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java @@ -17,11 +17,8 @@ package org.apache.ignite.internal.processors.cache; -import java.util.Collection; import java.util.UUID; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.lang.IgniteUuid; /** @@ -34,16 +31,6 @@ public interface GridCacheFuture<R> extends IgniteInternalFuture<R> { public IgniteUuid futureId(); /** - * @return Future version. - */ - public GridCacheVersion version(); - - /** - * @return Involved nodes. - */ - public Collection<? extends ClusterNode> nodes(); - - /** * Callback for when node left. * * @param nodeId Left node ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 2334780..9afbca8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -34,22 +34,24 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; @@ -437,7 +439,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case 50: { GridNearGetResponse res = (GridNearGetResponse)msg; - GridCacheFuture fut = ctx.mvcc().future(res.version(), res.futureId()); + CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -448,10 +450,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(res.classError()); - if (fut instanceof GridNearGetFuture) - ((GridNearGetFuture)fut).onResult(nodeId, res); - else - ((GridPartitionedGetFuture)fut).onResult(nodeId, res); + fut.onResult(nodeId, res); } break; @@ -521,6 +520,43 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; + case 116: { + GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg; + + GridNearSingleGetResponse res = new GridNearSingleGetResponse( + ctx.cacheId(), + req.futureId(), + req.topologyVersion(), + null, + false, + req.deployInfo() != null); + + res.error(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + } + + break; + + case 117: { + GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg; + + GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId()); + + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); + + return; + } + + res.error(res.classError()); + + fut.onResult(nodeId, res); + } + + break; + default: throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]"); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index bdd2118..61136bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -499,15 +499,21 @@ public abstract class GridCacheMessage implements Message { int size = col.size(); - for (int i = 0 ; i < size; i++) { - CacheObject obj = col.get(i); + for (int i = 0 ; i < size; i++) + prepareMarshalCacheObject(col.get(i), ctx); + } - if (obj != null) { - obj.prepareMarshal(ctx.cacheObjectContext()); + /** + * @param obj Object. + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + protected final void prepareMarshalCacheObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException { + if (obj != null) { + obj.prepareMarshal(ctx.cacheObjectContext()); - if (addDepInfo) - prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx); - } + if (addDepInfo) + prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java index 67c1330..080a6f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java @@ -17,11 +17,18 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + /** * Distributed future aware of MVCC locking. */ public interface GridCacheMvccFuture<T> extends GridCacheFuture<T> { /** + * @return Future version. + */ + public GridCacheVersion version(); + + /** * @param entry Entry which received new owner. * @param owner Owner. * @return {@code True} if future cares about this entry. http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 8562f37..9104acb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -31,7 +30,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -101,12 +99,15 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** Active futures mapped by version ID. */ @GridToStringExclude - private final ConcurrentMap<GridCacheVersion, Collection<GridCacheFuture<?>>> futs = newMap(); + private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap(); /** Pending atomic futures. */ private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>(); + /** */ + private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>(); + /** Near to DHT version mapping. */ private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap(); @@ -137,12 +138,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { prev + ']'); if (owner != null && (owner.local() || owner.nearLocal())) { - Collection<? extends GridCacheFuture> futCol = futs.get(owner.version()); + Collection<GridCacheMvccFuture<?>> futCol = mvccFuts.get(owner.version()); if (futCol != null) { synchronized (futCol) { - for (GridCacheFuture fut : futCol) { - if (fut instanceof GridCacheMvccFuture && !fut.isDone()) { + for (GridCacheMvccFuture<?> fut : futCol) { + if (!fut.isDone()) { GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut; // Since this method is called outside of entry synchronization, @@ -206,18 +207,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { for (GridCacheFuture<?> fut : activeFutures()) fut.onNodeLeft(discoEvt.eventNode().id()); - for (IgniteInternalFuture<?> fut : atomicFuts.values()) { - if (fut instanceof GridCacheFuture) { - GridCacheFuture cacheFut = (GridCacheFuture)fut; + for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) { + cacheFut.onNodeLeft(discoEvt.eventNode().id()); - cacheFut.onNodeLeft(discoEvt.eventNode().id()); + if (cacheFut.isCancelled() || cacheFut.isDone()) { + GridCacheVersion futVer = cacheFut.version(); - if (cacheFut.isCancelled() || cacheFut.isDone()) { - GridCacheVersion futVer = cacheFut.version(); - - if (futVer != null) - atomicFuts.remove(futVer, fut); - } + if (futVer != null) + atomicFuts.remove(futVer, cacheFut); } } } @@ -261,12 +258,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { public Collection<GridCacheFuture<?>> activeFutures() { ArrayList<GridCacheFuture<?>> col = new ArrayList<>(); - for (Collection<GridCacheFuture<?>> verFuts : futs.values()) { - synchronized (verFuts) { - col.addAll(verFuts); + for (Collection<GridCacheMvccFuture<?>> futs : mvccFuts.values()) { + synchronized (futs) { + col.addAll(futs); } } + col.addAll(futs.values()); + return col; } @@ -420,13 +419,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @param fut Future. + * @param futId Future ID. + */ + public void addFuture(final GridCacheFuture<?> fut, final IgniteUuid futId) { + GridCacheFuture<?> old = futs.put(futId, fut); + + assert old == null : old; + + onFutureAdded(fut); + } + + /** * Adds future. * * @param fut Future. * @return {@code True} if added. */ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"}) - public boolean addFuture(final GridCacheFuture<?> fut) { + public boolean addFuture(final GridCacheMvccFuture<?> fut) { if (fut.isDone()) { fut.markNotTrackable(); @@ -437,10 +448,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { return true; while (true) { - Collection<GridCacheFuture<?>> old = futs.get(fut.version()); + Collection<GridCacheMvccFuture<?>> old = mvccFuts.get(fut.version()); if (old == null) { - Collection<GridCacheFuture<?>> col = new HashSet<GridCacheFuture<?>>(U.capacity(4), 0.75f) { + Collection<GridCacheMvccFuture<?>> col = new HashSet<GridCacheMvccFuture<?>>(U.capacity(4), 0.75f) { { // Make sure that we add future to queue before // adding queue to the map of futures. @@ -456,7 +467,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } }; - old = futs.putIfAbsent(fut.version(), col); + old = mvccFuts.putIfAbsent(fut.version(), col); } if (old != null) { @@ -471,7 +482,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { // Future is being removed, so we force-remove here and try again. if (empty) { - if (futs.remove(fut.version(), old)) { + if (mvccFuts.remove(fut.version(), old)) { if (log.isDebugEnabled()) log.debug("Removed future list from futures map for lock version: " + fut.version()); } @@ -501,16 +512,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { break; } - // Close window in case of node is gone before the future got added to - // the map of futures. - for (ClusterNode n : fut.nodes()) { - if (cctx.discovery().node(n.id()) == null) - fut.onNodeLeft(n.id()); - } - // Just in case if future was completed before it was added. if (fut.isDone()) - removeFuture(fut); + removeMvccFuture(fut); else onFutureAdded(fut); @@ -537,15 +541,22 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @param futId Future ID. + */ + public void removeFuture(IgniteUuid futId) { + futs.remove(futId); + } + + /** * @param fut Future to remove. * @return {@code True} if removed. */ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"}) - public boolean removeFuture(GridCacheFuture<?> fut) { + public boolean removeMvccFuture(GridCacheMvccFuture<?> fut) { if (!fut.trackable()) return true; - Collection<GridCacheFuture<?>> cur = futs.get(fut.version()); + Collection<GridCacheMvccFuture<?>> cur = mvccFuts.get(fut.version()); if (cur == null) return false; @@ -565,7 +576,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { else if (log.isDebugEnabled()) log.debug("Attempted to remove a non-registered future (has it been already removed?): " + fut); - if (empty && futs.remove(fut.version(), cur)) + if (empty && mvccFuts.remove(fut.version(), cur)) if (log.isDebugEnabled()) log.debug("Removed future list from futures map for lock version: " + fut.version()); @@ -580,12 +591,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @return Future. */ @SuppressWarnings({"unchecked"}) - @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) { - Collection<? extends GridCacheFuture> futs = this.futs.get(ver); + @Nullable public GridCacheMvccFuture<?> mvccFuture(GridCacheVersion ver, IgniteUuid futId) { + Collection<GridCacheMvccFuture<?>> futs = this.mvccFuts.get(ver); if (futs != null) { synchronized (futs) { - for (GridCacheFuture<?> fut : futs) { + for (GridCacheMvccFuture<?> fut : futs) { if (fut.futureId().equals(futId)) { if (log.isDebugEnabled()) log.debug("Found future in futures map: " + fut); @@ -603,22 +614,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** - * Gets all futures for given lock version, possibly empty collection. - * - * @param ver Version. - * @return All futures for given lock version. + * @param futId Future ID. + * @return Found future. */ - @SuppressWarnings("unchecked") - public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) { - Collection c = futs.get(ver); - - if (c == null) - return Collections.<IgniteInternalFuture<T>>emptyList(); - else { - synchronized (c) { - return new ArrayList<>((Collection<IgniteInternalFuture<T>>)c); - } - } + @Nullable public GridCacheFuture future(IgniteUuid futId) { + return futs.get(futId); } /** @@ -913,7 +913,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']'); X.println(">>> rmvLocksSize: " + rmvLocks.sizex()); X.println(">>> lockedSize: " + locked.size()); - X.println(">>> futsSize: " + futs.size()); + X.println(">>> futsSize: " + (mvccFuts.size() + futs.size())); X.println(">>> near2dhtSize: " + near2dht.size()); X.println(">>> finishFutsSize: " + finishFuts.sizex()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index 01c4867..1648de0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -389,16 +388,6 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea } /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return tx.xidVersion(); - } - - /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return nodes.values(); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { for (IgniteInternalFuture<?> fut : futures()) if (isMini(fut)) { @@ -424,7 +413,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { if (super.onDone(res, err)) { - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeFuture(futId); if (err == null) { assert res != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 721ba4e..245ffc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -18,9 +18,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; @@ -40,7 +43,7 @@ import static org.apache.ignite.IgniteSystemProperties.getInteger; * */ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>> - implements GridCacheFuture<Map<K, V>> { + implements GridCacheFuture<Map<K, V>>, CacheGetFuture { /** Default max remap count value. */ public static final int DFLT_MAX_REMAP_CNT = 3; @@ -155,4 +158,26 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun map.put(key, new T2<>(skipVals ? true : val, ver)); } + + /** + * Affinity node to send get request to. + * + * @param key Key to get. + * @param topVer Topology version. + * @return Affinity node to get key from. + */ + protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { + if (!canRemap) { + List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer); + + for (ClusterNode node : affNodes) { + if (cctx.discovery().alive(node)) + return node; + } + + return null; + } + else + return cctx.affinity().primary(key, topVer); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java new file mode 100644 index 0000000..ebe2cff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; + +/** + * + */ +public interface CacheGetFuture { + /** + * @param nodeId Node ID. + * @param res Response. + */ + public void onResult(UUID nodeId, GridNearGetResponse res); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index bdd1140..8537357 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -57,9 +57,12 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -76,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -110,6 +114,46 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** + * @param nodeId Sender node ID. + * @param res Near get response. + */ + protected final void processNearGetResponse(UUID nodeId, GridNearGetResponse res) { + if (log.isDebugEnabled()) + log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']'); + + CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId()); + + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); + + return; + } + + fut.onResult(nodeId, res); + } + + /** + * @param nodeId Sender node ID. + * @param res Near get response. + */ + protected void processNearSingleGetResponse(UUID nodeId, GridNearSingleGetResponse res) { + if (log.isDebugEnabled()) + log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']'); + + GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId()); + + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); + + return; + } + + fut.onResult(nodeId, res); + } + + /** * @param ctx Context. */ protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) { @@ -669,6 +713,103 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param nodeId Node ID. * @param req Get request. */ + protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) { + assert ctx.affinityNode(); + + long ttl = req.accessTtl(); + + final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl); + + LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1); + + map.put(req.key(), req.addReader()); + + IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = + getDhtAsync(nodeId, + req.messageId(), + map, + req.readThrough(), + req.topologyVersion(), + req.subjectId(), + req.taskNameHash(), + expiryPlc, + req.skipValues()); + + fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() { + @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) { + GridNearSingleGetResponse res; + + GridDhtFuture<Collection<GridCacheEntryInfo>> fut = + (GridDhtFuture<Collection<GridCacheEntryInfo>>)f; + + try { + Collection<GridCacheEntryInfo> entries = fut.get(); + + if (F.isEmpty(fut.invalidPartitions())) { + GridCacheEntryInfo info = F.first(entries); + + Message res0 = null; + + if (info != null) { + if (req.needEntryInfo()) { + info.key(null); + + res0 = info; + } else if (req.needVersion()) + res0 = new CacheVersionedValue(info.value(), info.version()); + else + res0 = info.value(); + } + + res = new GridNearSingleGetResponse(ctx.cacheId(), + req.futureId(), + req.topologyVersion(), + res0, + false, + req.addDeploymentInfo()); + + if (info != null && req.skipValues()) + res.setContainsValue(); + } + else { + res = new GridNearSingleGetResponse(ctx.cacheId(), + req.futureId(), + ctx.shared().exchange().readyAffinityVersion(), + null, + true, + req.addDeploymentInfo()); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed processing get request: " + req, e); + + res = new GridNearSingleGetResponse(ctx.cacheId(), + req.futureId(), + req.topologyVersion(), + null, + false, + req.addDeploymentInfo()); + + res.error(e); + } + + try { + ctx.io().send(nodeId, res, ctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + + ",req=" + req + ", res=" + res + ']', e); + } + + sendTtlUpdateRequest(expiryPlc); + } + }); + } + + /** + * @param nodeId Node ID. + * @param req Get request. + */ protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) { assert ctx.affinityNode(); assert !req.reload() : req; http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 7284fd4..a7978c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -262,20 +262,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> log.debug("Added invalid partition to future [invalidPart=" + invalidPart + ", fut=" + this + ']'); } - /** - * @return Participating nodes. - */ - @Override public Collection<? extends ClusterNode> nodes() { - return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - /** {@inheritDoc} */ @Override public GridCacheVersion version() { return lockVer; @@ -756,7 +742,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> log.debug("Completing future: " + this); // Clean up. - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeMvccFuture(this); if (timeoutObj != null) cctx.time().removeTimeoutObject(timeoutObj); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 35f63e3..2468cf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; @@ -126,6 +127,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } }); + ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { + @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) { + processNearSingleGetRequest(nodeId, req); + } + }); + ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { @Override public void apply(UUID nodeId, GridNearLockRequest req) { processNearLockRequest(nodeId, req); @@ -566,7 +573,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) { assert nodeId != null; assert res != null; - GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId()); + GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>mvccFuture(res.version(), res.futureId()); if (fut == null) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 992bd66..bb370a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -66,7 +66,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur private GridCacheSharedContext<K, V> cctx; /** Future ID. */ - private IgniteUuid futId; + private final IgniteUuid futId; /** Transaction. */ @GridToStringExclude @@ -115,26 +115,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur } /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return tx.xidVersion(); - } - - /** - * @return Involved nodes. - */ - @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { for (IgniteInternalFuture<?> fut : futures()) if (isMini(fut)) { @@ -228,7 +208,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur this.tx.sendFinishReply(commit, error()); // Don't forget to clean up. - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeFuture(futId); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 2bed931..f344d48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -490,7 +490,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); - cctx.mvcc().addFuture(fut); + cctx.mvcc().addFuture(fut, fut.futureId()); GridDhtTxPrepareFuture prep = prepFut.get(); @@ -580,7 +580,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); - cctx.mvcc().addFuture(fut); + cctx.mvcc().addFuture(fut, fut.futureId()); if (prepFut == null) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index d081c0c..4cb5d05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -243,21 +243,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter return tx.xidVersion(); } - /** - * @return Involved nodes. - */ - @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - /** {@inheritDoc} */ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { if (log.isDebugEnabled()) @@ -823,7 +808,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (super.onDone(res, err.get())) { // Don't forget to clean up. - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeMvccFuture(this); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index febe9ba..c3d9836 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -57,10 +56,11 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.*; + /** * Colocated get future. */ @@ -71,15 +71,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + /** Dummy version sent to older nodes for backward compatibility, */ + private static final GridCacheVersion DUMMY_VER = new GridCacheVersion(0, 0, 0, 0); + /** Logger. */ private static IgniteLogger log; /** Topology version. */ private AffinityTopologyVersion topVer; - /** Version. */ - private GridCacheVersion ver; - /** * @param cctx Context. * @param keys Keys. @@ -126,8 +126,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda this.topVer = topVer; - ver = cctx.versions().next(); - if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class); } @@ -160,25 +158,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return ver; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<Map<K, V>>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<Map<K, V>> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { boolean found = false; @@ -219,7 +198,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (super.onDone(res, err)) { // Don't forget to clean up. if (trackable) - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeFuture(futId); cache().sendTtlUpdateRequest(expiryPlc); @@ -274,9 +253,11 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda add(new GridFinishedFuture<>(locVals)); if (hasRmtNodes) { - trackable = true; + if (!trackable) { + trackable = true; - cctx.mvcc().addFuture(this); + cctx.mvcc().addFuture(this, futId); + } } // Create mini futures. @@ -343,7 +324,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda cctx.cacheId(), futId, fut.futureId(), - ver, + n.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0 ? null : DUMMY_VER, mappedKeys, readThrough, topVer, @@ -390,7 +371,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda boolean remote = false; // Allow to get cached value from the local node. - boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer); + boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || + cctx.affinity().primary(cctx.localNode(), key, topVer); while (true) { GridCacheEntryEx entry; @@ -521,28 +503,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } /** - * Finds affinity node to send get request to. - * - * @param key Key to get. - * @param topVer Topology version. - * @return Affinity node from which the key will be requested. - */ - private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { - if (!canRemap) { - List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer); - - for (ClusterNode node : nodes) { - if (cctx.discovery().alive(node)) - return node; - } - - return null; - } - else - return cctx.affinity().primary(key, topVer); - } - - /** * @param infos Entry infos. * @return Result map. */ @@ -557,7 +517,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (needVer) versionedResult(map, info.key(), info.value(), info.version()); - else + else { cctx.addResult(map, info.key(), info.value(), @@ -565,6 +525,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda keepCacheObjects, deserializePortable, false); + } } return map;
