Repository: ignite Updated Branches: refs/heads/ignite-9720 28a76b3bb -> a6cd973a2
http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java new file mode 100644 index 0000000..f7fe9cb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; +import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; + +/** + * Represents partition update counters delivery to remote nodes. + */ +public class PartitionCountersNeighborcastFuture extends GridCacheCompoundIdentityFuture<Void> { + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + /** */ + private boolean trackable = true; + /** */ + private final GridCacheSharedContext<?, ?> cctx; + /** */ + private final IgniteInternalTx tx; + /** */ + private final IgniteLogger log; + + /** */ + public PartitionCountersNeighborcastFuture( + IgniteInternalTx tx, GridCacheSharedContext<?, ?> cctx) { + super(null); + + this.tx = tx; + + this.cctx = cctx; + + log = cctx.logger(CU.TX_MSG_RECOVERY_LOG_CATEGORY); + } + + /** + * Starts processing. + */ + public void init() { + if (log.isInfoEnabled()) { + log.info("Starting delivery partition countres to remote nodes [txId=" + tx.nearXidVersion() + + ", futId=" + futId); + } + + HashSet<UUID> siblings = siblingBackups(); + + cctx.mvcc().addFuture(this, futId); + + for (UUID peer : siblings) { + List<PartitionUpdateCountersMessage> cntrs = cctx.tm().txHandler() + .filterUpdateCountersForBackupNode(tx, cctx.node(peer)); + + if (F.isEmpty(cntrs)) + continue; + + MiniFuture miniFut = new MiniFuture(peer); + + try { + cctx.io().send(peer, new PartitionCountersNeighborcastRequest(cntrs, futId), SYSTEM_POOL); + + add(miniFut); + } + catch (IgniteCheckedException e) { + if (!(e instanceof ClusterTopologyCheckedException)) + log.warning("Failed to send partition counters to remote node [node=" + peer + ']', e); + else + logNodeLeft(peer); + + miniFut.onDone(); + } + } + + markInitialized(); + } + + /** */ + private HashSet<UUID> siblingBackups() { + Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes(); + + assert txNodes != null; + + UUID locNodeId = cctx.localNodeId(); + + HashSet<UUID> siblings = new HashSet<>(); + + txNodes.values().stream() + .filter(backups -> backups.contains(locNodeId)) + .forEach(siblings::addAll); + + siblings.remove(locNodeId); + + return siblings; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { + boolean comp = super.onDone(res, err); + + if (comp) + cctx.mvcc().removeFuture(futId); + + return comp; + } + + /** + * Processes a response from a remote peer. Completes a mini future for that peer. + * + * @param nodeId Remote peer node id. + */ + public void onResult(UUID nodeId) { + if (log.isInfoEnabled()) + log.info("Remote peer acked partition counters delivery [futId=" + futId + + ", node=" + nodeId + ']'); + + completeMini(nodeId); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + logNodeLeft(nodeId); + + // if a left node is one of remote peers then a mini future for it is completed successfully + completeMini(nodeId); + + return true; + } + + /** */ + private void completeMini(UUID nodeId) { + for (IgniteInternalFuture<?> fut : futures()) { + assert fut instanceof MiniFuture; + + MiniFuture mini = (MiniFuture)fut; + + if (mini.nodeId.equals(nodeId)) { + cctx.kernalContext().closure().runLocalSafe(mini::onDone); + + break; + } + } + } + + /** */ + private void logNodeLeft(UUID nodeId) { + if (log.isInfoEnabled()) { + log.info("Failed during partition counters delivery to remote node. " + + "Node left cluster (will ignore) [futId=" + futId + + ", node=" + nodeId + ']'); + } + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return trackable; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + trackable = false; + } + + /** + * Component of compound parent future. Represents interaction with one of remote peers. + */ + private static class MiniFuture extends GridFutureAdapter<Void> { + /** */ + private final UUID nodeId; + + /** */ + private MiniFuture(UUID nodeId) { + this.nodeId = nodeId; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java index e1a0bd6..550ec09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; +import org.jetbrains.annotations.Nullable; /** * Values which should be tracked during transaction execution and applied on commit. @@ -69,7 +70,7 @@ public class TxCounters { /** * @return Final update counters. */ - public Collection<PartitionUpdateCountersMessage> updateCounters() { + @Nullable public Collection<PartitionUpdateCountersMessage> updateCounters() { return updCntrs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 4569f65..fbfd99b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -1423,7 +1423,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (cache != null && !cache.isLocal() && cache.context().userCache()) req.addUpdateCounters(ctx.localNodeId(), - toCountersMap(cache.context().topology().localUpdateCounters(false))); + toCountersMap(cache.context().topology().localUpdateCounters(false, false))); } } @@ -1564,7 +1564,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (cache != null && !cache.isLocal() && cache.context().userCache()) { CachePartitionPartialCountersMap cntrsMap = - cache.context().topology().localUpdateCounters(false); + cache.context().topology().localUpdateCounters(false, false); cntrs = U.marshal(marsh, cntrsMap); } @@ -2504,7 +2504,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) cntrsPerNode.put(ctx.localNodeId(), - toCountersMap(cctx.topology().localUpdateCounters(false))); + toCountersMap(cctx.topology().localUpdateCounters(false, false))); routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java index 8bdfafe..a7880a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java @@ -314,4 +314,4 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri private boolean ignoredMessage(GridIoMessage msg) { return ignoreMsgCls != null && ignoreMsgCls.isAssignableFrom(msg.message().getClass()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java index 7514555..3f55e9c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java @@ -23,10 +23,12 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; @@ -40,6 +42,8 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; @@ -58,6 +62,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; /** @@ -268,8 +273,10 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest assertNotNull(cache); - assertEquals("Failed to check entry value on node: " + checkNodeId, - fullFailure ? initVal : val, cache.localPeek(key)); + if (atomicityMode() != TRANSACTIONAL_SNAPSHOT) { + assertEquals("Failed to check entry value on node: " + checkNodeId, + fullFailure ? initVal : val, cache.localPeek(key)); + } return null; } @@ -278,8 +285,22 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest } for (Map.Entry<Integer, String> e : map.entrySet()) { - for (Ignite g : G.allGrids()) - assertEquals(fullFailure ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(e.getKey())); + long cntr0 = -1; + + for (Ignite g : G.allGrids()) { + Integer key = e.getKey(); + + assertEquals(fullFailure ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(key)); + + if (g.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(((IgniteEx)g).localNode(), key)) { + long nodeCntr = updateCoutner(g, key); + + if (cntr0 == -1) + cntr0 = nodeCntr; + + assertEquals(cntr0, nodeCntr); + } + } } } @@ -402,6 +423,9 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest assertFalse(e.getValue().isEmpty()); + if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) + continue; + for (ClusterNode node : e.getValue()) { final UUID checkNodeId = node.id(); @@ -425,8 +449,22 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest } for (Map.Entry<Integer, String> e : map.entrySet()) { - for (Ignite g : G.allGrids()) - assertEquals(!commmit ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(e.getKey())); + long cntr0 = -1; + + for (Ignite g : G.allGrids()) { + Integer key = e.getKey(); + + assertEquals(!commmit ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(key)); + + if (g.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(((IgniteEx)g).localNode(), key)) { + long nodeCntr = updateCoutner(g, key); + + if (cntr0 == -1) + cntr0 = nodeCntr; + + assertEquals(cntr0, nodeCntr); + } + } } } @@ -529,4 +567,21 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest else return false; } -} \ No newline at end of file + + /** */ + private static long updateCoutner(Ignite ign, Object key) { + return dataStore(((IgniteEx)ign).cachex(DEFAULT_CACHE_NAME).context(), key) + .map(IgniteCacheOffheapManager.CacheDataStore::updateCounter) + .orElse(0L); + } + + /** */ + private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore( + GridCacheContext<?, ?> cctx, Object key) { + int p = cctx.affinity().partition(key); + + return StreamSupport.stream(cctx.offheap().cacheDataStores().spliterator(), false) + .filter(ds -> ds.partId() == p) + .findFirst(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java index 07bbf6c..81d4796 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java @@ -148,4 +148,4 @@ public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends testTxOriginatingNodeFails(keys, false); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java index 23304a4..bb3fff0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java @@ -34,4 +34,4 @@ public class IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest return ccfg; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java index 00f9729..b0d083d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java @@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.UUID; +import java.util.stream.StreamSupport; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; @@ -33,10 +36,13 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -55,6 +61,7 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_VER_1; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; @@ -114,6 +121,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends * @throws Exception If failed. */ public void testOptimisticPrimaryNodeFailureRecovery1() throws Exception { + if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return; + primaryNodeFailure(false, false, true); } @@ -121,6 +130,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends * @throws Exception If failed. */ public void testOptimisticPrimaryNodeFailureRecovery2() throws Exception { + if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return; + primaryNodeFailure(true, false, true); } @@ -128,6 +139,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends * @throws Exception If failed. */ public void testOptimisticPrimaryNodeFailureRollback1() throws Exception { + if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return; + primaryNodeFailure(false, true, true); } @@ -135,8 +148,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends * @throws Exception If failed. */ public void testOptimisticPrimaryNodeFailureRollback2() throws Exception { + if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return; + primaryNodeFailure(true, true, true); } + /** * @throws Exception If failed. */ @@ -245,8 +261,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { try { - checkKey(key1, rollback ? null : key1Nodes); - checkKey(key2, rollback ? null : key2Nodes); + checkKey(key1, rollback, key1Nodes, 0); + checkKey(key2, rollback, key2Nodes, 0); return true; } @@ -258,14 +274,16 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends } }, 5000); - checkKey(key1, rollback ? null : key1Nodes); - checkKey(key2, rollback ? null : key2Nodes); + checkKey(key1, rollback, key1Nodes, 0); + checkKey(key2, rollback, key2Nodes, 0); } /** * @throws Exception If failed. */ public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception { + if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return; + primaryAndOriginatingNodeFailure(false, false, true); } @@ -273,6 +291,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends * @throws Exception If failed. */ public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception { + if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return; + primaryAndOriginatingNodeFailure(true, false, true); } @@ -280,6 +300,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends * @throws Exception If failed. */ public void testOptimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception { + if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return; + primaryAndOriginatingNodeFailure(false, true, true); } @@ -287,6 +309,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends * @throws Exception If failed. */ public void testOptimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception { + if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return; + primaryAndOriginatingNodeFailure(true, true, true); } @@ -327,14 +351,14 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends private void primaryAndOriginatingNodeFailure(final boolean locBackupKey, final boolean rollback, boolean optimistic) - throws Exception - { + throws Exception { // TODO IGNITE-6174: when exchanges can be merged test fails because of IGNITE-6174. System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, "true"); try { - IgniteCache<Integer, Integer> cache0 = jcache(0); - IgniteCache<Integer, Integer> cache2 = jcache(2); + int orig = 0; + + IgniteCache<Integer, Integer> origCache = jcache(orig); Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME); @@ -342,7 +366,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends for (int key = 0; key < 10_000; key++) { if (aff.isPrimary(ignite(1).cluster().localNode(), key)) { - if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) { + if (locBackupKey == aff.isBackup(ignite(orig).cluster().localNode(), key)) { key0 = key; break; @@ -353,27 +377,27 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends assertNotNull(key0); final Integer key1 = key0; - final Integer key2 = primaryKey(cache2); + final Integer key2 = primaryKey(jcache(2)); - int backups = cache0.getConfiguration(CacheConfiguration.class).getBackups(); + int backups = origCache.getConfiguration(CacheConfiguration.class).getBackups(); final Collection<ClusterNode> key1Nodes = - (locBackupKey && backups < 2) ? null : aff.mapKeyToPrimaryAndBackups(key1); + (locBackupKey && backups < 2) ? Collections.emptyList() : aff.mapKeyToPrimaryAndBackups(key1); final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2); - TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi(); + TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(orig).configuration().getCommunicationSpi(); - IgniteTransactions txs = ignite(0).transactions(); + IgniteTransactions txs = ignite(orig).transactions(); Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ); log.info("Put key1 [key1=" + key1 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key1)) + ']'); - cache0.put(key1, key1); + origCache.put(key1, key1); log.info("Put key2 [key2=" + key2 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key2)) + ']'); - cache0.put(key2, key2); + origCache.put(key2, key2); log.info("Start prepare."); @@ -399,13 +423,13 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends log.info("Stop originating node."); - stopGrid(0); + stopGrid(orig); GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { try { - checkKey(key1, rollback ? null : key1Nodes); - checkKey(key2, rollback ? null : key2Nodes); + checkKey(key1, rollback, key1Nodes, 0); + checkKey(key2, rollback, key2Nodes, 0); return true; } catch (AssertionError e) { @@ -416,24 +440,23 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends } }, 5000); - checkKey(key1, rollback ? null : key1Nodes); - checkKey(key2, rollback ? null : key2Nodes); + checkKey(key1, rollback, key1Nodes, 0); + checkKey(key2, rollback, key2Nodes, 0); } finally { System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1); } } - /** - * @param key Key. - * @param keyNodes Key nodes. - */ - private void checkKey(Integer key, Collection<ClusterNode> keyNodes) { - if (keyNodes == null) { - for (Ignite ignite : G.allGrids()) { - IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + /** */ + private void checkKey(Integer key, boolean rollback, Collection<ClusterNode> keyNodes, long initUpdCntr) { + if (rollback) { + if (atomicityMode() != TRANSACTIONAL_SNAPSHOT) { + for (Ignite ignite : G.allGrids()) { + IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); - assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key)); + assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key)); + } } for (Ignite ignite : G.allGrids()) { @@ -441,10 +464,34 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends assertNull("Unexpected value for: " + ignite.name(), cache.get(key)); } + + boolean found = keyNodes.isEmpty(); + + long cntr0 = -1; + + for (ClusterNode node : keyNodes) { + try { + long nodeCntr = updateCoutner(grid(node), key); + + found = true; + + if (cntr0 == -1) + cntr0 = nodeCntr; + + assertEquals(cntr0, nodeCntr); + } + catch (IgniteIllegalStateException ignore) { + // No-op. + } + } + + assertTrue("Failed to find key node.", found); } - else { + else if (!keyNodes.isEmpty()) { boolean found = false; + long cntr0 = -1; + for (ClusterNode node : keyNodes) { try { Ignite ignite = grid(node); @@ -454,6 +501,13 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends ignite.cache(DEFAULT_CACHE_NAME); assertEquals("Unexpected value for: " + ignite.name(), key, key); + + long nodeCntr = updateCoutner(ignite, key); + + if (cntr0 == -1) + cntr0 = nodeCntr; + + assertTrue(nodeCntr == cntr0 && nodeCntr > initUpdCntr); } catch (IgniteIllegalStateException ignore) { // No-op. @@ -498,6 +552,23 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends assertTrue("Failed to wait for tx.", wait); } + /** */ + private static long updateCoutner(Ignite ign, Object key) { + return dataStore(((IgniteEx)ign).cachex(DEFAULT_CACHE_NAME).context(), key) + .map(IgniteCacheOffheapManager.CacheDataStore::updateCounter) + .orElse(0L); + } + + /** */ + private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore( + GridCacheContext<?, ?> cctx, Object key) { + int p = cctx.affinity().partition(key); + + return StreamSupport.stream(cctx.offheap().cacheDataStores().spliterator(), false) + .filter(ds -> ds.partId() == p) + .findFirst(); + } + /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java index 79308c8..8730c5c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java @@ -35,4 +35,4 @@ public class GridCacheReplicatedTxOriginatingNodeFailureSelfTest extends @Override protected Class<?> ignoreMessageClass() { return GridDistributedTxPrepareRequest.class; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 35da7a4..ca3c09f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -74,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryC import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.CI1; @@ -3181,7 +3181,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { MvccProcessorImpl crd = mvccProcessor(node); // Start query to prevent cleanup. - IgniteInternalFuture<MvccSnapshot> fut = crd.requestSnapshotAsync(); + IgniteInternalFuture<MvccSnapshot> fut = crd.requestSnapshotAsync((IgniteInternalTx)null); fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 91c702e..0fef7b2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -521,7 +521,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME); CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology() - .localUpdateCounters(false); + .localUpdateCounters(false, false); for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) { if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java index a2c6c83..0cdd0c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java @@ -61,4 +61,4 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite { return suite; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java new file mode 100644 index 0000000..01f50cc --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java @@ -0,0 +1,654 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntPredicate; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.NodeMode.CLIENT; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.NodeMode.SERVER; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.TxEndResult.COMMIT; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.TxEndResult.ROLLBAK; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionState.COMMITTED; +import static org.apache.ignite.transactions.TransactionState.PREPARED; +import static org.apache.ignite.transactions.TransactionState.PREPARING; +import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; + +/** */ +public class CacheMvccTxRecoveryTest extends CacheMvccAbstractTest { + /** */ + public enum TxEndResult { + /** */ COMMIT, + /** */ ROLLBAK + } + + /** */ + public enum NodeMode { + /** */ SERVER, + /** */ CLIENT + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + throw new RuntimeException("Is not supposed to be used"); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + return cfg; + } + + /** + * @throws Exception if failed. + */ + public void testRecoveryCommitNearFailure1() throws Exception { + checkRecoveryNearFailure(COMMIT, CLIENT); + } + + /** + * @throws Exception if failed. + */ + public void testRecoveryCommitNearFailure2() throws Exception { + checkRecoveryNearFailure(COMMIT, SERVER); + } + + /** + * @throws Exception if failed. + */ + public void testRecoveryRollbackNearFailure1() throws Exception { + checkRecoveryNearFailure(ROLLBAK, CLIENT); + } + + /** + * @throws Exception if failed. + */ + public void testRecoveryRollbackNearFailure2() throws Exception { + checkRecoveryNearFailure(ROLLBAK, SERVER); + } + + /** + * @throws Exception if failed. + */ + public void testRecoveryCommitPrimaryFailure1() throws Exception { + checkRecoveryPrimaryFailure(COMMIT, false); + } + + /** + * @throws Exception if failed. + */ + public void testRecoveryRollbackPrimaryFailure1() throws Exception { + checkRecoveryPrimaryFailure(ROLLBAK, false); + } + + /** + * @throws Exception if failed. + */ + public void testRecoveryCommitPrimaryFailure2() throws Exception { + checkRecoveryPrimaryFailure(COMMIT, true); + } + + /** + * @throws Exception if failed. + */ + public void testRecoveryRollbackPrimaryFailure2() throws Exception { + checkRecoveryPrimaryFailure(ROLLBAK, true); + } + + /** */ + private void checkRecoveryNearFailure(TxEndResult endRes, NodeMode nearNodeMode) throws Exception { + int gridCnt = 4; + int baseCnt = gridCnt - 1; + + boolean commit = endRes == COMMIT; + + startGridsMultiThreaded(baseCnt); + + // tweak client/server near + client = nearNodeMode == CLIENT; + + IgniteEx nearNode = startGrid(baseCnt); + + IgniteCache<Object, Object> cache = nearNode.getOrCreateCache(basicCcfg() + .setBackups(1)); + + Affinity<Object> aff = nearNode.affinity(DEFAULT_CACHE_NAME); + + List<Integer> keys = new ArrayList<>(); + + for (int i = 0; i < 100; i++) { + if (aff.isPrimary(grid(0).localNode(), i) && aff.isBackup(grid(1).localNode(), i)) { + keys.add(i); + break; + } + } + + for (int i = 0; i < 100; i++) { + if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) { + keys.add(i); + break; + } + } + + assert keys.size() == 2; + + TestRecordingCommunicationSpi nearComm + = (TestRecordingCommunicationSpi)nearNode.configuration().getCommunicationSpi(); + + if (!commit) + nearComm.blockMessages(GridNearTxPrepareRequest.class, grid(1).name()); + + GridTestUtils.runAsync(() -> { + // run in separate thread to exclude tx from thread-local map + GridNearTxLocal nearTx + = ((TransactionProxyImpl)nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx(); + + for (Integer k : keys) + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k)); + + List<IgniteInternalTx> txs = IntStream.range(0, baseCnt) + .mapToObj(i -> txsOnNode(grid(i), nearTx.xidVersion())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + IgniteInternalFuture<?> prepareFut = nearTx.prepareNearTxLocal(); + + if (commit) + prepareFut.get(); + else + assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED)); + + // drop near + nearNode.close(); + + assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? COMMITTED : ROLLED_BACK))); + + return null; + }).get(); + + if (commit) { + assertConditionEventually(() -> { + int rowsCnt = grid(0).cache(DEFAULT_CACHE_NAME) + .query(new SqlFieldsQuery("select * from Integer")).getAll().size(); + return rowsCnt == keys.size(); + }); + } + else { + int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME) + .query(new SqlFieldsQuery("select * from Integer")).getAll().size(); + + assertEquals(0, rowsCnt); + } + + assertPartitionCountersAreConsistent(keys, grids(baseCnt, i -> true)); + } + + /** */ + private void checkRecoveryPrimaryFailure(TxEndResult endRes, boolean mvccCrd) throws Exception { + int gridCnt = 4; + int baseCnt = gridCnt - 1; + + boolean commit = endRes == COMMIT; + + startGridsMultiThreaded(baseCnt); + + client = true; + + IgniteEx nearNode = startGrid(baseCnt); + + IgniteCache<Object, Object> cache = nearNode.getOrCreateCache(basicCcfg() + .setBackups(1)); + + Affinity<Object> aff = nearNode.affinity(DEFAULT_CACHE_NAME); + + List<Integer> keys = new ArrayList<>(); + + for (int i = 0; i < 100; i++) { + if (aff.isPrimary(grid(0).localNode(), i) && aff.isBackup(grid(1).localNode(), i)) { + keys.add(i); + break; + } + } + + for (int i = 0; i < 100; i++) { + if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) { + keys.add(i); + break; + } + } + + assert keys.size() == 2; + + int victim, victimBackup; + + if (mvccCrd) { + victim = 0; + victimBackup = 1; + } + else { + victim = 1; + victimBackup = 2; + } + + TestRecordingCommunicationSpi victimComm = (TestRecordingCommunicationSpi)grid(victim).configuration().getCommunicationSpi(); + + if (commit) + victimComm.blockMessages(GridNearTxFinishResponse.class, nearNode.name()); + else + victimComm.blockMessages(GridDhtTxPrepareRequest.class, grid(victimBackup).name()); + + GridNearTxLocal nearTx + = ((TransactionProxyImpl)nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx(); + + for (Integer k : keys) + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k)); + + List<IgniteInternalTx> txs = IntStream.range(0, baseCnt) + .filter(i -> i != victim) + .mapToObj(i -> txsOnNode(grid(i), nearTx.xidVersion())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + IgniteInternalFuture<IgniteInternalTx> commitFut = nearTx.commitAsync(); + + if (commit) + assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == COMMITTED)); + else + assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED)); + + // drop victim + grid(victim).close(); + + awaitPartitionMapExchange(); + + assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? COMMITTED : ROLLED_BACK))); + + assert victimComm.hasBlockedMessages(); + + if (commit) { + assertConditionEventually(() -> { + int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME) + .query(new SqlFieldsQuery("select * from Integer")).getAll().size(); + return rowsCnt == keys.size(); + }); + } + else { + int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME) + .query(new SqlFieldsQuery("select * from Integer")).getAll().size(); + + assertEquals(0, rowsCnt); + } + + assertTrue(commitFut.isDone()); + + assertPartitionCountersAreConsistent(keys, grids(baseCnt, i -> i != victim)); + } + + /** + * @throws Exception if failed. + */ + public void testRecoveryCommit() throws Exception { + startGridsMultiThreaded(2); + + client = true; + + IgniteEx ign = startGrid(2); + + IgniteCache<Object, Object> cache = ign.getOrCreateCache(basicCcfg()); + + AtomicInteger keyCntr = new AtomicInteger(); + + ArrayList<Integer> keys = new ArrayList<>(); + + ign.cluster().forServers().nodes() + .forEach(node -> keys.add(keyForNode(ign.affinity(DEFAULT_CACHE_NAME), keyCntr, node))); + + GridTestUtils.runAsync(() -> { + // run in separate thread to exclude tx from thread-local map + Transaction tx = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + for (Integer k : keys) + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k)); + + ((TransactionProxyImpl)tx).tx().prepareNearTxLocal().get(); + + return null; + }).get(); + + // drop near + stopGrid(2, true); + + IgniteEx srvNode = grid(0); + + assertConditionEventually( + () -> srvNode.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("select * from Integer")).getAll().size() == 2 + ); + + assertPartitionCountersAreConsistent(keys, G.allGrids()); + } + + /** + * @throws Exception if failed. + */ + public void testCountersNeighborcastServerFailed() throws Exception { + int srvCnt = 4; + + startGridsMultiThreaded(srvCnt); + + client = true; + + IgniteEx ign = startGrid(srvCnt); + + IgniteCache<Object, Object> cache = ign.getOrCreateCache(basicCcfg() + .setBackups(2)); + + ArrayList<Integer> keys = new ArrayList<>(); + + int vid = 3; + + IgniteEx victim = grid(vid); + + Affinity<Object> aff = ign.affinity(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 100; i++) { + if (aff.isPrimary(victim.localNode(), i) && !aff.isBackup(grid(0).localNode(), i)) { + keys.add(i); + break; + } + } + + for (int i = 0; i < 100; i++) { + if (aff.isPrimary(victim.localNode(), i) && !aff.isBackup(grid(1).localNode(), i)) { + keys.add(i); + break; + } + } + + assert keys.size() == 2 && !keys.contains(99); + + // prevent prepare on one backup + ((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi()) + .blockMessages(GridDhtTxPrepareRequest.class, grid(0).name()); + + GridNearTxLocal nearTx = ((TransactionProxyImpl)ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx(); + + for (Integer k : keys) + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k)); + + List<IgniteInternalTx> txs = IntStream.range(0, srvCnt) + .mapToObj(this::grid) + .filter(g -> g != victim) + .map(g -> txsOnNode(g, nearTx.xidVersion())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + nearTx.commitAsync(); + + // await tx partially prepared + assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED)); + + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + + IgniteInternalFuture<Object> backgroundTxFut = GridTestUtils.runAsync(() -> { + try (Transaction ignored = ign.transactions().txStart()) { + boolean upd = false; + + for (int i = 100; i < 200; i++) { + if (!aff.isPrimary(victim.localNode(), i)) { + cache.put(i, 11); + upd = true; + break; + } + } + + assert upd; + + latch1.countDown(); + + latch2.await(); + } + + return null; + }); + + latch1.await(); + + // drop primary + victim.close(); + + // do all assertions before rebalance + assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == ROLLED_BACK)); + + List<IgniteEx> liveNodes = grids(srvCnt, i -> i != vid); + + assertPartitionCountersAreConsistent(keys, liveNodes); + + latch2.countDown(); + + backgroundTxFut.get(); + + assertTrue(liveNodes.stream() + .map(node -> node.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("select * from Integer")).getAll()) + .allMatch(Collection::isEmpty)); + } + + /** + * @throws Exception if failed. + */ + public void testUpdateCountersGapIsClosed() throws Exception { + int srvCnt = 3; + + startGridsMultiThreaded(srvCnt); + + client = true; + + IgniteEx ign = startGrid(srvCnt); + + IgniteCache<Object, Object> cache = ign.getOrCreateCache( + basicCcfg().setBackups(2)); + + int vid = 1; + + IgniteEx victim = grid(vid); + + ArrayList<Integer> keys = new ArrayList<>(); + + Integer part = null; + + Affinity<Object> aff = ign.affinity(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 2000; i++) { + int p = aff.partition(i); + if (aff.isPrimary(victim.localNode(), i)) { + if (part == null) part = p; + if (p == part) keys.add(i); + if (keys.size() == 2) break; + } + } + + assert keys.size() == 2; + + Transaction txA = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + // prevent first transaction prepare on backups + ((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi()) + .blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + final AtomicInteger limiter = new AtomicInteger(); + + @Override public boolean apply(ClusterNode node, Message msg) { + if (msg instanceof GridDhtTxPrepareRequest) + return limiter.getAndIncrement() < 2; + + return false; + } + }); + + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(0))); + + txA.commitAsync(); + + GridCacheVersion aXidVer = ((TransactionProxyImpl)txA).tx().xidVersion(); + + assertConditionEventually(() -> txsOnNode(victim, aXidVer).stream() + .anyMatch(tx -> tx.state() == PREPARING)); + + GridTestUtils.runAsync(() -> { + try (Transaction txB = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(1))); + + txB.commit(); + } + }).get(); + + long victimUpdCntr = updateCounter(victim.cachex(DEFAULT_CACHE_NAME).context(), keys.get(0)); + + List<IgniteEx> backupNodes = grids(srvCnt, i -> i != vid); + + List<IgniteInternalTx> backupTxsA = backupNodes.stream() + .map(node -> txsOnNode(node, aXidVer)) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + // drop primary + victim.close(); + + assertConditionEventually(() -> backupTxsA.stream().allMatch(tx -> tx.state() == ROLLED_BACK)); + + backupNodes.stream() + .map(node -> node.cache(DEFAULT_CACHE_NAME)) + .forEach(c -> { + assertEquals(1, c.query(new SqlFieldsQuery("select * from Integer")).getAll().size()); + }); + + backupNodes.forEach(node -> { + for (Integer k : keys) + assertEquals(victimUpdCntr, updateCounter(node.cachex(DEFAULT_CACHE_NAME).context(), k)); + }); + } + + /** */ + private static CacheConfiguration<Object, Object> basicCcfg() { + return new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(TRANSACTIONAL_SNAPSHOT) + .setCacheMode(PARTITIONED) + .setIndexedTypes(Integer.class, Integer.class); + } + + /** */ + private static List<IgniteInternalTx> txsOnNode(IgniteEx node, GridCacheVersion xidVer) { + List<IgniteInternalTx> txs = node.context().cache().context().tm().activeTransactions().stream() + .peek(tx -> assertEquals(xidVer, tx.nearXidVersion())) + .collect(Collectors.toList()); + + assert !txs.isEmpty(); + + return txs; + } + + /** */ + private static void assertConditionEventually(GridAbsPredicate p) + throws IgniteInterruptedCheckedException { + if (!GridTestUtils.waitForCondition(p, 5_000)) + fail(); + } + + /** */ + private List<IgniteEx> grids(int cnt, IntPredicate p) { + return IntStream.range(0, cnt).filter(p).mapToObj(this::grid).collect(Collectors.toList()); + } + + /** */ + private void assertPartitionCountersAreConsistent(Iterable<Integer> keys, Iterable<? extends Ignite> nodes) { + for (Integer key : keys) { + long cntr0 = -1; + + for (Ignite n : nodes) { + IgniteEx node = ((IgniteEx)n); + + if (node.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(node.localNode(), key)) { + long cntr = updateCounter(node.cachex(DEFAULT_CACHE_NAME).context(), key); +// System.err.println(node.localNode().consistentId() + " " + key + " -> " + cntr); + if (cntr0 == -1) + cntr0 = cntr; + + assertEquals(cntr0, cntr); + } + } + } + } + + /** */ + private static long updateCounter(GridCacheContext<?, ?> cctx, Object key) { + return dataStore(cctx, key) + .map(IgniteCacheOffheapManager.CacheDataStore::updateCounter) + .get(); + } + + /** */ + private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore( + GridCacheContext<?, ?> cctx, Object key) { + int p = cctx.affinity().partition(key); + IgniteCacheOffheapManager offheap = cctx.offheap(); + return StreamSupport.stream(offheap.cacheDataStores().spliterator(), false) + .filter(ds -> ds.partId() == p) + .findFirst(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java index cf68546..a0d492c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -84,7 +85,7 @@ public class GridIndexRebuildWithMvccEnabledSelfTest extends GridIndexRebuildSel * @throws IgniteCheckedException if failed. */ private static void lockVersion(IgniteEx node) throws IgniteCheckedException { - node.context().coordinators().requestSnapshotAsync().get(); + node.context().coordinators().requestSnapshotAsync((IgniteInternalTx)null).get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java index ce2a130..15045c9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java @@ -18,6 +18,12 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest; +import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest; import org.apache.ignite.internal.processors.cache.index.MvccEmptyTransactionSelfTest; import org.apache.ignite.internal.processors.cache.index.SqlTransactionsCommandsWithMvccEnabledSelfTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccBasicContinuousQueryTest; @@ -60,10 +66,13 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlLockTimeoutT import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCountersTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxNodeMappingTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest; import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadBulkOpsTest; import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadOperationsTest; import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; + /** * */ @@ -140,6 +149,55 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite { suite.addTestSuite(CacheMvccContinuousWithTransformerPartitionedSelfTest.class); suite.addTestSuite(CacheMvccContinuousWithTransformerReplicatedSelfTest.class); + // Transaction recovery. + suite.addTestSuite(CacheMvccTxRecoveryTest.class); + + suite.addTestSuite(MvccPartitionedPrimaryNodeFailureRecoveryTest.class); + suite.addTestSuite(MvccPartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class); + suite.addTestSuite(MvccColocatedTxPessimisticOriginatingNodeFailureRecoveryTest.class); + suite.addTestSuite(MvccReplicatedTxPessimisticOriginatingNodeFailureRecoveryTest.class); + return suite; } + + /** */ + public static class MvccPartitionedPrimaryNodeFailureRecoveryTest + extends IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL_SNAPSHOT; + } + } + + /** */ + public static class MvccPartitionedTwoBackupsPrimaryNodeFailureRecoveryTest + extends IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL_SNAPSHOT; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + } + + /** */ + public static class MvccColocatedTxPessimisticOriginatingNodeFailureRecoveryTest + extends GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL_SNAPSHOT; + } + } + + /** */ + public static class MvccReplicatedTxPessimisticOriginatingNodeFailureRecoveryTest + extends GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL_SNAPSHOT; + } + } }
