ignite-1811 Optimized cache 'get' on affinity node. (cherry picked from commit 83b2bf5)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b31f4e6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b31f4e6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b31f4e6 Branch: refs/heads/ignite-1.5.4 Commit: 4b31f4e66c15003cee866a6e5660257346c16c1c Parents: 4ba6574 Author: sboikov <sboi...@gridgain.com> Authored: Mon Jan 18 18:05:37 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jan 18 18:16:05 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 30 +- .../processors/cache/GridCacheContext.java | 33 ++ .../dht/CacheDistributedGetFutureAdapter.java | 28 +- .../dht/GridClientPartitionTopology.java | 2 + .../dht/GridDhtPartitionTopologyImpl.java | 27 +- .../dht/GridPartitionedGetFuture.java | 241 ++++++----- .../dht/GridPartitionedSingleGetFuture.java | 229 ++++++---- .../dht/atomic/GridDhtAtomicCache.java | 26 ++ .../distributed/near/GridNearGetFuture.java | 267 +++++++----- .../cache/transactions/IgniteTxManager.java | 18 +- .../internal/TestRecordingCommunicationSpi.java | 157 +++++++ ...idCacheConfigurationConsistencySelfTest.java | 58 +-- .../cache/IgniteCacheNearLockValueSelfTest.java | 62 +-- ...eDynamicCacheStartNoExchangeTimeoutTest.java | 7 + ...ridCachePartitionNotLoadedEventSelfTest.java | 7 +- .../IgniteCacheAtomicNodeRestartTest.java | 2 + ...niteCacheClientNodeChangingTopologyTest.java | 4 +- .../distributed/IgniteCacheGetRestartTest.java | 280 ++++++++++++ .../IgniteCacheReadFromBackupTest.java | 427 +++++++++++++++++++ .../IgniteCacheSingleGetMessageTest.java | 88 +--- .../IgniteCrossCacheTxStoreSelfTest.java | 1 + .../GridCacheDhtPreloadMessageCountTest.java | 62 +-- .../near/GridCacheGetStoreErrorSelfTest.java | 9 +- .../GridCachePartitionedNodeRestartTest.java | 4 +- ...ePartitionedOptimisticTxNodeRestartTest.java | 4 +- .../IgniteCacheRestartTestSuite2.java | 3 + .../testsuites/IgniteCacheTestSuite4.java | 2 + 27 files changed, 1512 insertions(+), 566 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 5d4c386..2582e6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4540,9 +4540,33 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Cached value. * @throws IgniteCheckedException If failed. */ - @Nullable public V get(K key, boolean deserializeBinary) - throws IgniteCheckedException { - return getAsync(key, deserializeBinary).get(); + @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException { + checkJta(); + + String taskName = ctx.kernalContext().job().currentTaskName(); + + return get(key, taskName, deserializeBinary); + } + + /** + * @param key Key. + * @param taskName Task name. + * @param deserializeBinary Deserialize binary flag. + * @return Cached value. + * @throws IgniteCheckedException If failed. + */ + protected V get( + final K key, + String taskName, + boolean deserializeBinary) throws IgniteCheckedException { + return getAsync(key, + !ctx.config().isReadFromBackup(), + /*skip tx*/false, + null, + taskName, + deserializeBinary, + false, + /*can remap*/true).get(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index c10ebf3..fc48b9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -111,6 +111,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; import static org.apache.ignite.cache.CacheRebalanceMode.NONE; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** * Cache context. @@ -1434,6 +1435,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return {@code True} if store and read-through mode are enabled in configuration. + */ + public boolean readThroughConfigured() { + return store().configured() && cacheCfg.isReadThrough(); + } + + /** * @return {@code True} if {@link CacheConfiguration#isLoadPreviousValue()} flag is set. */ public boolean loadPreviousValue() { @@ -1961,6 +1969,31 @@ public class GridCacheContext<K, V> implements Externalizable { }); } + /** + * @param part Partition. + * @param affNodes Affinity nodes. + * @param topVer Topology version. + * @return {@code True} if cache 'get' operation is allowed to get entry locally. + */ + public boolean allowFastLocalRead(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) { + return affinityNode() && rebalanceEnabled() && hasPartition(part, affNodes, topVer); + } + + /** + * @param part Partition. + * @param affNodes Affinity nodes. + * @param topVer Topology version. + * @return {@code True} if partition is available locally. + */ + private boolean hasPartition(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) { + assert affinityNode(); + + GridDhtPartitionTopology top = topology(); + + return (top.rebalanceFinished(topVer) && (isReplicated() || affNodes.contains(locNode))) + || (top.partitionState(localNodeId(), part) == OWNING); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, gridName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index cfbc21b..5dc5e98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheFuture; @@ -38,6 +39,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS; import static org.apache.ignite.IgniteSystemProperties.getInteger; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** * @@ -162,14 +164,11 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun /** * Affinity node to send get request to. * - * @param key Key to get. - * @param topVer Topology version. + * @param affNodes All affinity nodes. * @return Affinity node to get key from. */ - protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { + protected final ClusterNode affinityNode(List<ClusterNode> affNodes) { if (!canRemap) { - List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer); - for (ClusterNode node : affNodes) { if (cctx.discovery().alive(node)) return node; @@ -178,6 +177,23 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun return null; } else - return cctx.affinity().primary(key, topVer); + return affNodes.get(0); + } + + /** + * @param part Partition. + * @return {@code True} if partition is in owned state. + */ + protected final boolean partitionOwned(int part) { + return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING; + } + + /** + * @param topVer Topology version. + * @return Exception. + */ + protected final ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) { + return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 8aef5ad..dcfc038 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -882,6 +882,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { + assert false : "Should not be called on non-affinity node"; + return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index a0709c5..2ab8a12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -88,7 +88,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private GridDhtPartitionExchangeId lastExchangeId; /** */ - private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; + private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; /** */ private volatile boolean stopping; @@ -136,9 +136,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { topReadyFut = null; - topVer = AffinityTopologyVersion.NONE; - rebalancedTopVer = AffinityTopologyVersion.NONE; + + topVer = AffinityTopologyVersion.NONE; } finally { lock.writeLock().unlock(); @@ -223,13 +223,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { this.stopping = stopping; - topVer = exchId.topologyVersion(); - updateSeq.setIfGreater(updSeq); topReadyFut = exchFut; rebalancedTopVer = AffinityTopologyVersion.NONE; + + topVer = exchId.topologyVersion(); } finally { lock.writeLock().unlock(); @@ -238,17 +238,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion() { - lock.readLock().lock(); + AffinityTopologyVersion topVer = this.topVer; - try { - assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + - ", cacheName=" + cctx.name() + ']'; + assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + + ", cacheName=" + cctx.name() + ']'; - return topVer; - } - finally { - lock.readLock().unlock(); - } + return topVer; } /** {@inheritDoc} */ @@ -1336,7 +1331,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { - return topVer.equals(rebalancedTopVer); + AffinityTopologyVersion curTopVer = this.topVer; + + return curTopVer.equals(topVer) && curTopVer.equals(rebalancedTopVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index e8aaca0..30b19e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -234,15 +235,16 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped, AffinityTopologyVersion topVer ) { - if (CU.affinityNodes(cctx, topVer).isEmpty()) { + Collection<ClusterNode> cacheNodes = CU.affinityNodes(cctx, topVer); + + if (cacheNodes.isEmpty()) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']')); return; } - Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = - U.newHashMap(CU.affinityNodes(cctx, topVer).size()); + Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(cacheNodes.size()); final int keysSize = keys.size(); @@ -374,135 +376,160 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda AffinityTopologyVersion topVer, Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped ) { - GridDhtCacheAdapter<K, V> colocated = cache(); + int part = cctx.affinity().partition(key); - boolean remote = false; + List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer); - // Allow to get cached value from the local node. - boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || - cctx.affinity().primary(cctx.localNode(), key, topVer); + if (affNodes.isEmpty()) { + onDone(serverNotFoundError(topVer)); - while (true) { - GridCacheEntryEx entry; + return false; + } - try { - if (allowLocRead) { - try { - entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : - colocated.peekEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( - null, - /*swap*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary); - - if (res != null) { - v = res.get1(); - ver = res.get2(); - } - } - else { - v = entry.innerGet(null, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary); - } + boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) && + cctx.allowFastLocalRead(part, affNodes, topVer); - colocated.context().evicts().touch(entry, topVer); + if (fastLocGet && localGet(key, part, locVals)) + return false; - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - if (isNew && entry.markObsoleteIfEmpty(ver)) - colocated.removeIfObsolete(key); - } - else { - if (needVer) - versionedResult(locVals, key, v, ver); - else - cctx.addResult(locVals, - key, - v, - skipVals, - keepCacheObjects, - deserializeBinary, - true); - - return false; - } - } - } - catch (GridDhtInvalidPartitionException ignored) { - // No-op. - } - } + ClusterNode node = affinityNode(affNodes); - ClusterNode node = affinityNode(key, topVer); + if (node == null) { + onDone(serverNotFoundError(topVer)); - if (node == null) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid).")); + return false; + } - return false; - } + boolean remote = !node.isLocal(); + + LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node); + + if (keys != null && keys.containsKey(key)) { + if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { + onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + + MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" + + U.toShortString(node) + ", mappings=" + mapped + ']')); + + return false; + } + } + + LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node); + + if (old == null) + mappings.put(node, old = new LinkedHashMap<>(3, 1f)); + + old.put(key, false); + + return remote; + } - remote = !node.isLocal(); + /** + * @param key Key. + * @param part Partition. + * @param locVals Local values. + * @return {@code True} if there is no need to further search value. + */ + private boolean localGet(KeyCacheObject key, int part, Map<K, V> locVals) { + assert cctx.affinityNode() : this; - LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node); + GridDhtCacheAdapter<K, V> cache = cache(); - if (keys != null && keys.containsKey(key)) { - if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { - onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + - MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" + - U.toShortString(node) + ", mappings=" + mapped + ']')); + while (true) { + GridCacheEntryEx entry; - return false; + try { + entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + } + + cache.context().evicts().touch(entry, topVer); + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(ver)) + cache.removeIfObsolete(key); + } + else { + if (needVer) + versionedResult(locVals, key, v, ver); + else { + cctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObjects, + deserializeBinary, + true); + } + + return true; } } - LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node); + boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); - if (old == null) - mappings.put(node, old = new LinkedHashMap<>(3, 1f)); + // Entry not found, do not continue search if topology did not change and there is no store. + if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) { + if (!skipVals && cctx.config().isStatisticsEnabled()) + cache.metrics0().onRead(false); - old.put(key, false); + return true; + } - break; + return false; + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, will retry. + } + catch (GridDhtInvalidPartitionException ignored) { + return false; } catch (IgniteCheckedException e) { onDone(e); - break; - } - catch (GridCacheEntryRemovedException ignored) { - // No-op, will retry. + return true; } } - - return remote; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 29971fd..0c811ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -58,6 +58,8 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; + /** * */ @@ -319,105 +321,140 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im * @return Primary node or {@code null} if future was completed. */ @Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) { - ClusterNode primary = affinityNode(key, topVer); + int part = cctx.affinity().partition(key); + + List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer); - if (primary == null) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']')); + if (affNodes.isEmpty()) { + onDone(serverNotFoundError(topVer)); return null; } - boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal(); - - if (allowLocRead) { - GridDhtCacheAdapter colocated = cctx.dht(); - - while (true) { - GridCacheEntryEx entry; - - try { - entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : - colocated.peekEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( - null, - /*swap*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - true); - - if (res != null) { - v = res.get1(); - ver = res.get2(); - } - } - else { - v = entry.innerGet(null, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc, - true); - } + boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) && + cctx.allowFastLocalRead(part, affNodes, topVer); - colocated.context().evicts().touch(entry, topVer); + if (fastLocGet && localGet(topVer, part)) + return null; - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - if (isNew && entry.markObsoleteIfEmpty(ver)) - colocated.removeIfObsolete(key); - } - else { - if (!skipVals && cctx.config().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(true); + ClusterNode affNode = affinityNode(affNodes); + + if (affNode == null) { + onDone(serverNotFoundError(topVer)); + + return null; + } + + return affNode; + } + + /** + * @param topVer Topology version. + * @param part Partition. + * @return {@code True} if future completed. + */ + private boolean localGet(AffinityTopologyVersion topVer, int part) { + assert cctx.affinityNode() : this; + + GridDhtCacheAdapter colocated = cctx.dht(); - if (!skipVals) - setResult(v, ver); - else - setSkipValueResult(true, ver); + while (true) { + GridCacheEntryEx entry; - return null; + try { + entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : + colocated.peekEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc, + true); + + if (res != null) { + v = res.get1(); + ver = res.get2(); } } + else { + v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc, + true); + } - break; - } - catch (GridDhtInvalidPartitionException ignored) { - break; - } - catch (IgniteCheckedException e) { - onDone(e); + colocated.context().evicts().touch(entry, topVer); + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(ver)) + colocated.removeIfObsolete(key); + } + else { + if (!skipVals && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(true); + + if (!skipVals) + setResult(v, ver); + else + setSkipValueResult(true, ver); - return null; + return true; + } } - catch (GridCacheEntryRemovedException ignored) { - // No-op, will retry. + + boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); + + // Entry not found, complete future with null result if topology did not change and there is no store. + if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) { + if (!skipVals && cctx.config().isStatisticsEnabled()) + colocated.metrics0().onRead(false); + + if (skipVals) + setSkipValueResult(false, null); + else + setResult(null, null); + + return true; } + + return false; } - } + catch (GridCacheEntryRemovedException ignored) { + // No-op, will retry. + } + catch (GridDhtInvalidPartitionException ignored) { + return false; + } + catch (IgniteCheckedException e) { + onDone(e); - return primary; + return true; + } + } } /** @@ -595,7 +632,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im } else { if (!keepCacheObjects) { - Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary && !skipVals); + Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary); onDone(res); } @@ -612,16 +649,30 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im } /** + * @param part Partition. + * @return {@code True} if partition is in owned state. + */ + private boolean partitionOwned(int part) { + return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING; + } + + /** + * @param topVer Topology version. + * @return Exception. + */ + private ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) { + return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'); + } + + /** * Affinity node to send get request to. * - * @param key Key to get. - * @param topVer Topology version. + * @param affNodes All affinity nodes. * @return Affinity node to get key from. */ - private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { + @Nullable private ClusterNode affinityNode(List<ClusterNode> affNodes) { if (!canRemap) { - List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer); - for (ClusterNode node : affNodes) { if (cctx.discovery().alive(node)) return node; @@ -630,7 +681,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im return null; } else - return cctx.affinity().primary(key, topVer); + return affNodes.get(0); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 393413e..81fd5d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -317,6 +317,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override protected V get(K key, String taskName, boolean deserializeBinary) throws IgniteCheckedException { + ctx.checkSecurity(SecurityPermission.CACHE_READ); + + if (keyCheck) + validateCacheKey(key); + + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + UUID subjId = ctx.subjectIdPerCall(null, opCtx); + + final ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null; + + final boolean skipStore = opCtx != null && opCtx.skipStore(); + + return getAsync0(ctx.toCacheKeyObject(key), + !ctx.config().isReadFromBackup(), + subjId, + taskName, + deserializeBinary, + expiryPlc, + false, + skipStore, + true).get(); + } + + /** {@inheritDoc} */ @Override protected IgniteInternalFuture<V> getAsync(final K key, final boolean forcePrimary, final boolean skipTx, http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index a121af9..dd6e8d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -405,10 +406,20 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped, Map<KeyCacheObject, GridNearCacheEntry> saved ) { + int part = cctx.affinity().partition(key); + + List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer); + + if (affNodes.isEmpty()) { + onDone(serverNotFoundError(topVer)); + + return null; + } + final GridNearCacheAdapter near = cache(); // Allow to get cached value from the local node. - boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer); + boolean allowLocRead = !forcePrimary || cctx.localNode().equals(affNodes.get(0)); while (true) { GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null; @@ -456,124 +467,23 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } } - ClusterNode affNode = null; - - if (v == null && allowLocRead && cctx.affinityNode()) { - GridDhtCacheAdapter<K, V> dht = cache().dht(); - - GridCacheEntryEx dhtEntry = null; - - try { - dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key); - - // If near cache does not have value, then we peek DHT cache. - if (dhtEntry != null) { - boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer); - - if (needVer) { - T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned( - null, - /*swap*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!isNear && !skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary); - - if (res != null) { - v = res.get1(); - ver = res.get2(); - } - } - else { - v = dhtEntry.innerGet(tx, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /*update-metrics*/false, - /*events*/!isNear && !skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary); - } - - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver)) - dht.removeIfObsolete(key); - } - - if (v != null) { - if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals) - near.metrics0().onRead(true); - } - else { - affNode = affinityNode(key, topVer); - - if (affNode == null) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid).")); - - return saved; - } - - if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals) - near.metrics0().onRead(false); - } - } - catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { - // No-op. - } - finally { - if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) { - dht.context().evicts().touch(dhtEntry, topVer); - - entry = null; - } - } - } - - if (v != null) { - if (needVer) { - V val0 = (V)new T2<>(skipVals ? true : v, ver); + if (v == null) { + boolean fastLocGet = allowLocRead && cctx.allowFastLocalRead(part, affNodes, topVer); - add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0))); - } - else { - if (keepCacheObjects) { - K key0 = (K)key; - V val0 = (V)(skipVals ? true : v); + if (fastLocGet && localDhtGet(key, part, topVer, isNear)) + break; - add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); - } - else { - K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false); - V val0 = !skipVals ? - (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) : - (V)Boolean.TRUE; + ClusterNode affNode = affinityNode(affNodes); - add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); - } - } - } - else { if (affNode == null) { - affNode = affinityNode(key, topVer); - - if (affNode == null) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid).")); + onDone(serverNotFoundError(topVer)); - return saved; - } + return saved; } + if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals && !affNode.isLocal()) + cache().metrics0().onRead(false); + LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode); if (keys != null && keys.containsKey(key)) { @@ -586,7 +496,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } } - if (!cctx.affinity().localNode(key, topVer)) { + if (!affNodes.contains(cctx.localNode())) { GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer); nearEntry.reserveEviction(); @@ -612,6 +522,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap old.put(key, addRdr); } + else + addResult(key, v, ver); break; } @@ -633,6 +545,135 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } /** + * @param key Key. + * @param part Partition. + * @param topVer Topology version. + * @param nearRead {@code True} if already tried to read from near cache. + * @return {@code True} if there is no need to further search value. + */ + private boolean localDhtGet(KeyCacheObject key, + int part, + AffinityTopologyVersion topVer, + boolean nearRead) { + GridDhtCacheAdapter<K, V> dht = cache().dht(); + + assert dht.context().affinityNode() : this; + + while (true) { + GridCacheEntryEx dhtEntry = null; + + try { + dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key); + + CacheObject v = null; + + // If near cache does not have value, then we peek DHT cache. + if (dhtEntry != null) { + boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer); + + if (needVer) { + T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!nearRead && !skipVals, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = dhtEntry.innerGet(tx, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*update-metrics*/false, + /*events*/!nearRead && !skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + } + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver)) + dht.removeIfObsolete(key); + } + + if (v != null) { + if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals) + cache().metrics0().onRead(true); + + addResult(key, v, ver); + + return true; + } + else { + boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); + + // Entry not found, do not continue search if topology did not change and there is no store. + return !cctx.readThroughConfigured() && (topStable || partitionOwned(part)); + } + } + catch (GridCacheEntryRemovedException ignored) { + // Retry. + } + catch (GridDhtInvalidPartitionException e) { + return false; + } + catch (IgniteCheckedException e) { + onDone(e); + + return false; + } + finally { + if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) + dht.context().evicts().touch(dhtEntry, topVer); + } + } + } + + /** + * @param key Key. + * @param v Value. + * @param ver Version. + */ + @SuppressWarnings("unchecked") + private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) { + if (needVer) { + V val0 = (V)new T2<>(skipVals ? true : v, ver); + + add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0))); + } + else { + if (keepCacheObjects) { + K key0 = (K)key; + V val0 = (V)(skipVals ? true : v); + + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); + } + else { + K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false); + V val0 = !skipVals ? + (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) : + (V)Boolean.TRUE; + + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); + } + } + } + + /** * @return Near cache. */ private GridNearCacheAdapter<K, V> cache() { http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index d384e4e..818ac11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -619,17 +619,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return topVer; } - for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { - if (!cacheCtx.systemTx()) - continue; + if (!sysThreadMap.isEmpty()) { + for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { + if (!cacheCtx.systemTx()) + continue; - tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId())); + tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId())); - if (tx != null && tx != ignore) { - AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); + if (tx != null && tx != ignore) { + AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); - if (topVer != null) - return topVer; + if (topVer != null) + return topVer; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java new file mode 100644 index 0000000..8a602ad --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME; + +/** + * + */ +public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { + /** */ + private Class<?> recordCls; + + /** */ + private List<Object> recordedMsgs = new ArrayList<>(); + + /** */ + private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>(); + + /** */ + private Map<Class<?>, Set<String>> blockCls = new HashMap<>(); + + /** */ + private IgnitePredicate<GridIoMessage> blockP; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) + throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + GridIoMessage ioMsg = (GridIoMessage)msg; + + Object msg0 = ioMsg.message(); + + synchronized (this) { + if (recordCls != null && msg0.getClass().equals(recordCls)) + recordedMsgs.add(msg0); + + boolean block = false; + + if (blockP != null && blockP.apply(ioMsg)) + block = true; + else { + Set<String> blockNodes = blockCls.get(msg0.getClass()); + + if (blockNodes != null) { + String nodeName = (String)node.attributes().get(ATTR_GRID_NAME); + + block = blockNodes.contains(nodeName); + } + } + + if (block) { + blockedMsgs.add(new T2<>(node, ioMsg)); + + return; + } + } + } + + super.sendMessage(node, msg, ackC); + } + + /** + * @param recordCls Message class to record. + */ + public void record(@Nullable Class<?> recordCls) { + synchronized (this) { + this.recordCls = recordCls; + } + } + + /** + * @return Recorded messages. + */ + public List<Object> recordedMessages() { + synchronized (this) { + List<Object> msgs = recordedMsgs; + + recordedMsgs = new ArrayList<>(); + + return msgs; + } + } + + /** + * @param blockP Message block predicate. + */ + public void blockMessages(IgnitePredicate<GridIoMessage> blockP) { + synchronized (this) { + this.blockP = blockP; + } + } + + /** + * @param cls Message class. + * @param nodeName Node name. + */ + public void blockMessages(Class<?> cls, String nodeName) { + synchronized (this) { + Set<String> set = blockCls.get(cls); + + if (set == null) { + set = new HashSet<>(); + + blockCls.put(cls, set); + } + + set.add(nodeName); + } + } + + /** + * Stops block messages and sends all already blocked messages. + */ + public void stopBlock() { + synchronized (this) { + blockCls.clear(); + + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) + super.sendMessage(msg.get1(), msg.get2()); + + blockedMsgs.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java index e28e89f..a1f917f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java @@ -19,25 +19,19 @@ package org.apache.ignite.internal.processors.cache; import java.io.Externalizable; import java.io.Serializable; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; import java.util.concurrent.Callable; import javax.cache.Cache; -import javax.cache.integration.CacheLoaderException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheInterceptorAdapter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityNodeIdHashResolver; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.eviction.EvictionFilter; import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy; import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; import org.apache.ignite.cache.eviction.random.RandomEvictionPolicy; -import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DeploymentMode; @@ -46,7 +40,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -54,7 +47,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridStringLogger; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -862,49 +854,9 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac }, IgniteCheckedException.class, null); } - /** */ - private static class TestStore implements CacheStore<Object,Object> { - /** {@inheritDoc} */ - @Nullable @Override public Object load(Object key) { - return null; - } - - /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException { - return Collections.emptyMap(); - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<?, ?> entry) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void deleteAll(Collection<?> keys) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void sessionEnd(boolean commit) { - // No-op. - } - } - + /** + * + */ private static class TestRendezvousAffinityFunction extends RendezvousAffinityFunction { /** * Empty constructor required by {@link Externalizable}. @@ -941,6 +893,10 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac // No-op, just different class. } + /** + * + */ private static class TestCacheDefaultAffinityKeyMapper extends GridCacheDefaultAffinityKeyMapper { + // No-op, just different class. } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java index 100acfe..f106fec 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java @@ -18,22 +18,15 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedDeque; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -71,7 +64,11 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest { if (getTestGridName(0).equals(gridName)) cfg.setClientMode(true); - cfg.setCommunicationSpi(new TestCommunicationSpi()); + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + commSpi.record(GridNearLockRequest.class); + + cfg.setCommunicationSpi(commSpi); return cfg; } @@ -88,18 +85,18 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest { cache.put("key1", "val1"); for (int i = 0; i < 3; i++) { - ((TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).clear(); - ((TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).clear(); - try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.get("key1"); tx.commit(); } - TestCommunicationSpi comm = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi(); + TestRecordingCommunicationSpi comm = + (TestRecordingCommunicationSpi)ignite(0).configuration().getCommunicationSpi(); + + Collection<GridNearLockRequest> reqs = (Collection)comm.recordedMessages(); - assertEquals(1, comm.requests().size()); + assertEquals(1, reqs.size()); GridCacheAdapter<Object, Object> primary = ((IgniteKernal)grid(1)).internalCache("partitioned"); @@ -107,7 +104,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest { assertNotNull(dhtEntry); - GridNearLockRequest req = comm.requests().iterator().next(); + GridNearLockRequest req = reqs.iterator().next(); assertEquals(dhtEntry.version(), req.dhtVersion(0)); @@ -122,39 +119,4 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest { } } } - - /** - * - */ - private static class TestCommunicationSpi extends TcpCommunicationSpi { - /** */ - private Collection<GridNearLockRequest> reqs = new ConcurrentLinkedDeque<>(); - - /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) - throws IgniteSpiException { - if (msg instanceof GridIoMessage) { - GridIoMessage ioMsg = (GridIoMessage)msg; - - if (ioMsg.message() instanceof GridNearLockRequest) - reqs.add((GridNearLockRequest)ioMsg.message()); - } - - super.sendMessage(node, msg, ackC); - } - - /** - * @return Collected requests. - */ - public Collection<GridNearLockRequest> requests() { - return reqs; - } - - /** - * - */ - public void clear() { - reqs.clear(); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java index 9acc4b5..ac80d69 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java @@ -46,6 +46,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** * @@ -344,6 +345,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst ccfg.setName("cache-1"); ccfg.setAtomicityMode(ATOMIC); ccfg.setBackups(0); + ccfg.setWriteSynchronizationMode(FULL_SYNC); res.add(ccfg); } @@ -354,6 +356,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst ccfg.setName("cache-2"); ccfg.setAtomicityMode(ATOMIC); ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); res.add(ccfg); } @@ -365,6 +368,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst ccfg.setAtomicityMode(ATOMIC); ccfg.setBackups(1); ccfg.setAffinity(new FairAffinityFunction()); + ccfg.setWriteSynchronizationMode(FULL_SYNC); res.add(ccfg); } @@ -375,6 +379,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst ccfg.setName("cache-4"); ccfg.setAtomicityMode(TRANSACTIONAL); ccfg.setBackups(0); + ccfg.setWriteSynchronizationMode(FULL_SYNC); res.add(ccfg); } @@ -385,6 +390,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst ccfg.setName("cache-5"); ccfg.setAtomicityMode(TRANSACTIONAL); ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); res.add(ccfg); } @@ -396,6 +402,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst ccfg.setAtomicityMode(TRANSACTIONAL); ccfg.setBackups(1); ccfg.setAffinity(new FairAffinityFunction()); + ccfg.setWriteSynchronizationMode(FULL_SYNC); res.add(ccfg); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java index 5bc779c..6a42752 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java @@ -22,7 +22,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -42,6 +41,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.util.TestTcpCommunicationSpi; import org.eclipse.jetty.util.ConcurrentHashSet; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + /** * */ @@ -76,8 +78,9 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); - cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setCacheMode(PARTITIONED); cacheCfg.setBackups(backupCnt); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); cfg.setCacheConfiguration(cacheCfg); http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java index 327db0e..37ed866 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java @@ -31,10 +31,12 @@ public class IgniteCacheAtomicNodeRestartTest extends GridCachePartitionedNodeRe return ATOMIC; } + /** {@inheritDoc} */ @Override public void testRestartWithPutFourNodesNoBackups() { fail("https://issues.apache.org/jira/browse/IGNITE-1587"); } + /** {@inheritDoc} */ @Override public void testRestartWithPutFourNodesOneBackupsOffheapTiered() { fail("https://issues.apache.org/jira/browse/IGNITE-1587"); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4b31f4e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index e7657a6..13f2598 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -2010,7 +2010,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac private List<Object> recordedMsgs = new ArrayList<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { if (msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); @@ -2032,7 +2032,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } } - super.sendMessage(node, msg, ackClosure); + super.sendMessage(node, msg, ackC); } /**