Repository: ignite Updated Branches: refs/heads/master 74d342d66 -> 18aaee039
http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 54c3cae..09445f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -23,10 +23,9 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -38,8 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; -import org.apache.ignite.internal.processors.cache.GridCacheUtils.BackupPostProcessingClosure; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter; @@ -50,12 +47,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -66,17 +58,8 @@ import org.jetbrains.annotations.Nullable; * */ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Logger. */ - private static IgniteLogger log; - /** Transaction. */ - private IgniteTxLocalEx tx; + private final IgniteTxLocalEx tx; /** */ private GridCacheVersion ver; @@ -111,7 +94,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap boolean keepCacheObjects, boolean recovery ) { - super(cctx, + super( + cctx, keys, readThrough, forcePrimary, @@ -122,18 +106,16 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap skipVals, needVer, keepCacheObjects, - recovery); + recovery + ); assert !F.isEmpty(keys); this.tx = tx; - futId = IgniteUuid.randomUuid(); - ver = tx == null ? cctx.versions().next() : tx.xidVersion(); - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridNearGetFuture.class); + initLogger(GridNearGetFuture.class); } /** @@ -147,74 +129,22 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap if (lockedTopVer != null) { canRemap = false; - map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), lockedTopVer); + map(keys, Collections.emptyMap(), lockedTopVer); } else { AffinityTopologyVersion mapTopVer = topVer; if (mapTopVer == null) { - mapTopVer = tx == null ? - (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) : - tx.topologyVersion(); + mapTopVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); } - map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), mapTopVer); + map(keys, Collections.emptyMap(), mapTopVer); } markInitialized(); } /** {@inheritDoc} */ - @Override public boolean trackable() { - return trackable; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - // Should not flip trackable flag from true to false since get future can be remapped. - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - return futId; - } - - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - boolean found = false; - - for (IgniteInternalFuture<Map<K, V>> fut : futures()) - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.node().id().equals(nodeId)) { - found = true; - - f.onNodeLeft(); - } - } - - return found; - } - - /** - * @param nodeId Sender. - * @param res Result. - */ - @Override public void onResult(UUID nodeId, GridNearGetResponse res) { - for (IgniteInternalFuture<Map<K, V>> fut : futures()) - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; - - if (f.futureId().equals(res.miniId())) { - assert f.node().id().equals(nodeId); - - f.onResult(res); - } - } - } - - /** {@inheritDoc} */ @Override public boolean onDone(Map<K, V> res, Throwable err) { if (super.onDone(res, err)) { // Don't forget to clean up. @@ -229,11 +159,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap return false; } - /** - * @param f Future. - * @return {@code True} if mini-future. - */ - private boolean isMini(IgniteInternalFuture<?> f) { + /** {@inheritDoc} */ + @Override protected boolean isMini(IgniteInternalFuture<?> f) { return f.getClass().equals(MiniFuture.class); } @@ -242,10 +169,10 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap * @param mapped Mappings to check for duplicates. * @param topVer Topology version to map on. */ - private void map( + @Override protected void map( Collection<KeyCacheObject> keys, Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped, - final AffinityTopologyVersion topVer + AffinityTopologyVersion topVer ) { Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer); @@ -268,7 +195,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap try { // Assign keys to primary nodes. for (KeyCacheObject key : keys) - savedEntries = map(key, mappings, topVer, mapped, savedEntries); + savedEntries = map(key, topVer, mappings, mapped, savedEntries); success = true; } @@ -292,23 +219,24 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap if (isDone()) return; - final Map<KeyCacheObject, GridNearCacheEntry> saved = savedEntries != null ? savedEntries : - Collections.<KeyCacheObject, GridNearCacheEntry>emptyMap(); + Map<KeyCacheObject, GridNearCacheEntry> saved = + savedEntries != null ? savedEntries : Collections.emptyMap(); - final int keysSize = keys.size(); + int keysSize = keys.size(); // Create mini futures. for (Map.Entry<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> entry : mappings.entrySet()) { - final ClusterNode n = entry.getKey(); + ClusterNode n = entry.getKey(); - final LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = entry.getValue(); + LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = entry.getValue(); assert !mappedKeys.isEmpty(); // If this is the primary or backup node for the keys. if (n.isLocal()) { - final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = - dht().getDhtAsync(n.id(), + GridDhtFuture<Collection<GridCacheEntryInfo>> fut = dht() + .getDhtAsync( + n.id(), -1, mappedKeys, false, @@ -320,74 +248,52 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap skipVals, recovery, null, - null); // TODO IGNITE-7371 + null + ); // TODO IGNITE-7371 - final Collection<Integer> invalidParts = fut.invalidPartitions(); + Collection<Integer> invalidParts = fut.invalidPartitions(); if (!F.isEmpty(invalidParts)) { Collection<KeyCacheObject> remapKeys = new ArrayList<>(keysSize); for (KeyCacheObject key : keys) { - if (key != null && invalidParts.contains(cctx.affinity().partition(key))) + int part = cctx.affinity().partition(key); + + if (key != null && invalidParts.contains(part)) { + addNodeAsInvalid(n, part, topVer); + remapKeys.add(key); + } } AffinityTopologyVersion updTopVer = cctx.shared().exchange().readyAffinityVersion(); - assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " + - "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + - ", invalidParts=" + invalidParts + ']'; - // Remap recursively. map(remapKeys, mappings, updTopVer); } // Add new future. - add(fut.chain(new C1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>, Map<K, V>>() { - @Override public Map<K, V> apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) { - try { - return loadEntries(n.id(), mappedKeys.keySet(), fut.get(), saved, topVer); - } - catch (Exception e) { - U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e); + add(fut.chain(f -> { + try { + return loadEntries(n.id(), mappedKeys.keySet(), f.get(), saved, topVer); + } + catch (Exception e) { + U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e); - onDone(e); + onDone(e); - return Collections.emptyMap(); - } + return Collections.emptyMap(); } })); } else { - if (!trackable) { - trackable = true; + registrateFutureInMvccManager(this); - cctx.mvcc().addFuture(this, futId); - } + MiniFuture miniFuture = new MiniFuture(n, mappedKeys, saved, topVer); - MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer, - CU.createBackupPostProcessingClosure(topVer, log, cctx, null, expiryPlc, readThrough, skipVals)); - - GridCacheMessage req = new GridNearGetRequest( - cctx.cacheId(), - futId, - fut.futureId(), - ver, - mappedKeys, - readThrough, - topVer, - subjId, - taskName == null ? 0 : taskName.hashCode(), - expiryPlc != null ? expiryPlc.forCreate() : -1L, - expiryPlc != null ? expiryPlc.forAccess() : -1L, - true, - skipVals, - cctx.deploymentEnabled(), - recovery, - null, - null); // TODO IGNITE-7371 - - add(fut); // Append new future. + GridNearGetRequest req = miniFuture.createGetRequest(futId); + + add(miniFuture); // Append new future. try { cctx.io().send(n, req, cctx.ioPolicy()); @@ -395,16 +301,14 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) - fut.onNodeLeft(); + miniFuture.onNodeLeft((ClusterTopologyCheckedException)e); else - fut.onResult(e); + miniFuture.onResult(e); } } } } - - /** * @param mappings Mappings. * @param key Key to map. @@ -415,8 +319,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap */ private Map<KeyCacheObject, GridNearCacheEntry> map( KeyCacheObject key, - Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, AffinityTopologyVersion topVer, + Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped, Map<KeyCacheObject, GridNearCacheEntry> saved ) { @@ -494,7 +398,9 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } } - ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap); + Set<ClusterNode> invalidNodesSet = getInvalidNodes(part, topVer); + + ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, invalidNodesSet, part, canRemap); if (affNode == null) { onDone(serverNotFoundError(part, topVer)); @@ -505,17 +411,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap if (cctx.statisticsEnabled() && !skipVals && !affNode.isLocal() && !isNear) cache().metrics0().onRead(false); - LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode); - - if (keys != null && keys.containsKey(key)) { - if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) { - onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + - MAX_REMAP_CNT + " attempts (key got remapped to the same node) " + - "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']')); - - return saved; - } - } + if (!checkRetryPermits(key,affNode,mapped)) + return saved; if (!affNodes.contains(cctx.localNode())) { GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer); @@ -572,10 +469,12 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap * @param nearRead {@code True} if already tried to read from near cache. * @return {@code True} if there is no need to further search value. */ - private boolean localDhtGet(KeyCacheObject key, + private boolean localDhtGet( + KeyCacheObject key, int part, AffinityTopologyVersion topVer, - boolean nearRead) { + boolean nearRead + ) { GridDhtCacheAdapter<K, V> dht = cache().dht(); assert dht.context().affinityNode() : this; @@ -795,9 +694,11 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap * @param saved Saved entries. * @param topVer Topology version. */ - private void releaseEvictions(Collection<KeyCacheObject> keys, + private void releaseEvictions( + Collection<KeyCacheObject> keys, Map<KeyCacheObject, GridNearCacheEntry> saved, - AffinityTopologyVersion topVer) { + AffinityTopologyVersion topVer + ) { for (KeyCacheObject key : keys) { GridNearCacheEntry entry = saved.get(key); @@ -812,101 +713,59 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap /** {@inheritDoc} */ @Override public String toString() { - Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { - @SuppressWarnings("unchecked") - @Override public String apply(IgniteInternalFuture<?> f) { - if (isMini(f)) { - return "[node=" + ((MiniFuture)f).node().id() + - ", loc=" + ((MiniFuture)f).node().isLocal() + - ", done=" + f.isDone() + "]"; - } - else - return "[loc=true, done=" + f.isDone() + "]"; - } - }); - return S.toString(GridNearGetFuture.class, this, - "innerFuts", futs, "super", super.toString()); } /** - * Mini-future for get operations. Mini-futures are only waiting on a single - * node as opposed to multiple nodes. + * Mini-future for get operations. Mini-futures are only waiting on a single node as opposed to multiple nodes. */ - private class MiniFuture extends GridFutureAdapter<Map<K, V>> { - /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Node ID. */ - private ClusterNode node; - - /** Keys. */ - @GridToStringInclude - private LinkedHashMap<KeyCacheObject, Boolean> keys; - + private class MiniFuture extends AbstractMiniFuture { /** Saved entry versions. */ - private Map<KeyCacheObject, GridNearCacheEntry> savedEntries; - - /** Topology version on which this future was mapped. */ - private AffinityTopologyVersion topVer; - - /** Post processing closure. */ - private final BackupPostProcessingClosure postProcessingClos; - - /** {@code True} if remapped after node left. */ - private boolean remapped; + private final Map<KeyCacheObject, GridNearCacheEntry> savedEntries; /** * @param node Node. * @param keys Keys. * @param savedEntries Saved entries. * @param topVer Topology version. - * @param postProcessingClos Post processing closure. */ MiniFuture( ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, Map<KeyCacheObject, GridNearCacheEntry> savedEntries, - AffinityTopologyVersion topVer, - BackupPostProcessingClosure postProcessingClos) { - this.node = node; - this.keys = keys; + AffinityTopologyVersion topVer + ) { + super(node, keys, topVer); this.savedEntries = savedEntries; - this.topVer = topVer; - this.postProcessingClos = postProcessingClos; } - /** - * @return Future ID. - */ - IgniteUuid futureId() { - return futId; - } - - /** - * @return Node ID. - */ - public ClusterNode node() { - return node; - } - - /** - * @return Keys. - */ - public Collection<KeyCacheObject> keys() { - return keys.keySet(); + /** {@inheritDoc} */ + @Override protected GridNearGetRequest createGetRequest0(IgniteUuid rootFutId, IgniteUuid futId) { + return new GridNearGetRequest( + cctx.cacheId(), + rootFutId, + futId, + ver, + keys, + readThrough, + topVer, + subjId, + taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forCreate() : -1L, + expiryPlc != null ? expiryPlc.forAccess() : -1L, + true, + skipVals, + cctx.deploymentEnabled(), + recovery, + null, + null + ); // TODO IGNITE-7371 } - /** - * @param e Error. - */ - void onResult(Throwable e) { - if (log.isDebugEnabled()) - log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); - - // Fail. - onDone(e); + /** {@inheritDoc} */ + @Override protected Map<K, V> createResultMap(Collection<GridCacheEntryInfo> entries) { + return loadEntries(node.id(), keys.keySet(), entries, savedEntries, topVer); } /** {@inheritDoc} */ @@ -920,136 +779,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap return false; } - /** - */ - synchronized void onNodeLeft() { - if (remapped) - return; - - remapped = true; - - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - - // Try getting value from alive nodes. - if (!canRemap) { - // Remap - map(keys.keySet(), F.t(node, keys), topVer); - - onDone(Collections.<K, V>emptyMap()); - } - else { - AffinityTopologyVersion updTopVer = - new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - - cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(f -> { - try { - // Remap. - map(keys.keySet(), F.t(node, keys), f.get()); - - onDone(Collections.<K, V>emptyMap()); - - } - catch (IgniteCheckedException e) { - GridNearGetFuture.this.onDone(e); - } - }); - } - } - - /** - * @param res Result callback. - */ - void onResult(final GridNearGetResponse res) { - final Collection<Integer> invalidParts = res.invalidPartitions(); - - // If error happened on remote node, fail the whole future. - if (res.error() != null) { - onDone(res.error()); - - return; - } - - // Remap invalid partitions. - if (!F.isEmpty(invalidParts)) { - AffinityTopologyVersion rmtTopVer = res.topologyVersion(); - - assert rmtTopVer.topologyVersion() != 0; - - if (rmtTopVer.compareTo(topVer) <= 0) { - // Fail the whole get future. - onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " + - "invalid partitions but remote topology version does not differ from local) " + - "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", invalidParts=" + invalidParts + - ", nodeId=" + node.id() + ']')); - - return; - } - - if (log.isDebugEnabled()) - log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']'); - - if (!canRemap) { - map(F.view(keys.keySet(), new P1<KeyCacheObject>() { - @Override public boolean apply(KeyCacheObject key) { - return invalidParts.contains(cctx.affinity().partition(key)); - } - }), F.t(node, keys), topVer); - - postProcessResultAndDone(res); - - return; - } - - // Need to wait for next topology version to remap. - IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.shared().exchange().affinityReadyFuture(rmtTopVer); - - topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void applyx( - IgniteInternalFuture<AffinityTopologyVersion> fut) throws IgniteCheckedException { - AffinityTopologyVersion readyTopVer = fut.get(); - - // This will append new futures to compound list. - map(F.view(keys.keySet(), new P1<KeyCacheObject>() { - @Override public boolean apply(KeyCacheObject key) { - return invalidParts.contains(cctx.affinity().partition(key)); - } - }), F.t(node, keys), readyTopVer); - - postProcessResultAndDone(res); - } - }); - } - else - postProcessResultAndDone(res); - - } - - /** - * Post processes result and done future. - * - * @param res Response. - */ - private void postProcessResultAndDone(final GridNearGetResponse res){ - try { - postProcessResult(res); - - // It is critical to call onDone after adding futures to compound list. - onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer)); - } - catch (Exception ex) { - onDone(ex); - } - } - - /** - * @param res Response. - */ - private void postProcessResult(final GridNearGetResponse res) { - if (postProcessingClos != null) - postProcessingClos.apply(res.entries()); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(MiniFuture.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java new file mode 100644 index 0000000..71cb4ed --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.AbstractFailureHandler; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.processors.cache.GridCacheFuture; +import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Test for getting values on unstable topology with read from backup enabled. + */ +public class CacheGetReadFromBackupFailoverTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** Tx cache name. */ + private static final String TX_CACHE = "txCache"; + /** Atomic cache name. */ + private static final String ATOMIC_CACHE = "atomicCache"; + /** Keys count. */ + private static final int KEYS_CNT = 50000; + /** Stop load flag. */ + private static final AtomicBoolean stop = new AtomicBoolean(); + /** Error. */ + private static final AtomicReference<Throwable> err = new AtomicReference<>(); + + /** + * @return Grid count. + */ + public int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setFailureHandler(new AbstractFailureHandler() { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { + err.compareAndSet(null, failureCtx.error()); + stop.set(true); + return false; + } + }); + + cfg.setConsistentId(igniteInstanceName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + CacheConfiguration<Long, Long> txCcfg = new CacheConfiguration<Long, Long>(TX_CACHE) + .setAtomicityMode(TRANSACTIONAL) + .setCacheMode(PARTITIONED) + .setBackups(1) + .setWriteSynchronizationMode(FULL_SYNC) + .setReadFromBackup(true); + + CacheConfiguration<Long, Long> atomicCcfg = new CacheConfiguration<Long, Long>(ATOMIC_CACHE) + .setAtomicityMode(ATOMIC) + .setCacheMode(PARTITIONED) + .setBackups(1) + .setWriteSynchronizationMode(FULL_SYNC) + .setReadFromBackup(true); + + cfg.setCacheConfiguration(txCcfg, atomicCcfg); + + // Enforce different mac adresses to emulate distributed environment by default. + cfg.setUserAttributes(Collections.singletonMap( + IgniteNodeAttributes.ATTR_MACS_OVERRIDE, UUID.randomUUID().toString())); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stop.set(false); + + err.set(null); + + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testFailover() throws Exception { + Ignite ignite = ignite(0); + + ignite.cluster().active(true); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + try (IgniteDataStreamer<Object, Object> stmr = ignite.dataStreamer(TX_CACHE)) { + for (int i = 0; i < KEYS_CNT; i++) + stmr.addData(i, rnd.nextLong()); + } + + try (IgniteDataStreamer<Object, Object> stmr = ignite.dataStreamer(ATOMIC_CACHE)) { + for (int i = 0; i < KEYS_CNT; i++) + stmr.addData(i, rnd.nextLong()); + } + + AtomicInteger idx = new AtomicInteger(-1); + + AtomicInteger successGet = new AtomicInteger(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + ThreadLocalRandom rnd0 = ThreadLocalRandom.current(); + + while (!stop.get()) { + Ignite ig = null; + + while (ig == null) { + int n = rnd0.nextInt(gridCount()); + + if (idx.get() != n) { + try { + ig = ignite(n); + } + catch (IgniteIllegalStateException e) { + // No-op. + } + } + } + + try { + if (rnd.nextBoolean()) { + ig.cache(TX_CACHE).get(rnd0.nextLong(KEYS_CNT)); + ig.cache(ATOMIC_CACHE).get(rnd0.nextLong(KEYS_CNT)); + } + else { + ig.cache(TX_CACHE).getAll(rnd.longs(16, 0, KEYS_CNT).boxed().collect(Collectors.toSet())); + ig.cache(ATOMIC_CACHE).getAll(rnd.longs(16, 0, KEYS_CNT).boxed().collect(Collectors.toSet())); + } + + successGet.incrementAndGet(); + } + catch (CacheException e) { + if (!X.hasCause(e, NodeStoppingException.class)) + throw e; + } + + } + }, "load-thread"); + + long startTime = System.currentTimeMillis(); + + while (System.currentTimeMillis() - startTime < 30 * 1000L) { + int idx0 = idx.get(); + + if (idx0 >= 0) + startGrid(idx0); + + U.sleep(500); + + int next = rnd.nextInt(gridCount()); + + idx.set(next); + + stopGrid(next); + + U.sleep(500); + } + + stop.set(true); + + while (true){ + try { + fut.get(10_000); + + break; + } + catch (IgniteFutureTimeoutCheckedException e) { + for (Ignite i : G.allGrids()) { + IgniteEx ex = (IgniteEx)i; + + log.info(">>>> " + ex.context().localNodeId()); + + GridCacheMvccManager mvcc = ex.context().cache().context().mvcc(); + + for (GridCacheFuture<?> fut0 : mvcc.activeFutures()) { + log.info("activeFut - " + fut0); + } + + for (GridCacheFuture<?> fut0 : mvcc.atomicFutures()) { + log.info("atomicFut - " + fut0); + } + } + } + } + + Assert.assertTrue(String.valueOf(successGet.get()), successGet.get() > 50); + + Throwable e = err.get(); + + if (e != null) { + log.error("Test failed", e); + + fail("Test failed"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 0250d58..e367aad 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheTransfor import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodeChangingTopologyTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodePartitionsExchangeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheServerNodeConcurrentStart; +import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetReadFromBackupFailoverTest; import org.apache.ignite.internal.processors.cache.distributed.dht.CachePartitionPartialCountersMapSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedOptimisticTransactionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedPreloadRestartSelfTest; @@ -246,6 +247,7 @@ public class IgniteCacheTestSuite2 extends TestSuite { GridTestUtils.addTestIfNeeded(suite, CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheTxLoadingConcurrentGridStartSelfTestAllowOverwrite.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridPartitionedBackupLoadSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheGetReadFromBackupFailoverTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCachePartitionedLoadCacheSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCachePartitionedEventSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCachePartitionNotLoadedEventSelfTest.class, ignoredTests);
