Repository: ignite Updated Branches: refs/heads/master 0e0d94c24 -> 6247ac719
IGNITE-2325 - Fixed assertion in optimistic tx prepare future on remap - Fixes #434. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6247ac71 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6247ac71 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6247ac71 Branch: refs/heads/master Commit: 6247ac719a5a7643a317c8e4574565c15fe2e588 Parents: 0e0d94c Author: Alexey Goncharuk <[email protected]> Authored: Mon Feb 15 20:12:04 2016 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Feb 15 20:12:04 2016 +0300 ---------------------------------------------------------------------- .../near/GridNearOptimisticTxPrepareFuture.java | 18 +- .../IgniteCacheNearRestartRollbackSelfTest.java | 276 +++++++++++++++++++ .../testsuites/IgniteCacheRestartTestSuite.java | 2 + 3 files changed, 288 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6247ac71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 8476dc3..f146071 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -263,9 +263,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa IgniteTxEntry singleWrite = tx.singleWrite(); if (singleWrite != null) - prepareSingle(singleWrite, topLocked); + prepareSingle(singleWrite, topLocked, remap); else - prepare(tx.writeEntries(), topLocked); + prepare(tx.writeEntries(), topLocked, remap); markInitialized(); } @@ -278,7 +278,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param write Write. * @param topLocked {@code True} if thread already acquired lock preventing topology change. */ - private void prepareSingle(IgniteTxEntry write, boolean topLocked) { + private void prepareSingle(IgniteTxEntry write, boolean topLocked, boolean remap) { write.clearEntryReadVersion(); AffinityTopologyVersion topVer = tx.topologyVersion(); @@ -287,7 +287,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa txMapping = new GridDhtTxMapping(); - GridDistributedTxMapping mapping = map(write, topVer, null, topLocked); + GridDistributedTxMapping mapping = map(write, topVer, null, topLocked, remap); if (mapping.node().isLocal()) { if (write.context().isNear()) @@ -325,7 +325,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa */ private void prepare( Iterable<IgniteTxEntry> writes, - boolean topLocked + boolean topLocked, + boolean remap ) { AffinityTopologyVersion topVer = tx.topologyVersion(); @@ -343,7 +344,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa for (IgniteTxEntry write : writes) { write.clearEntryReadVersion(); - GridDistributedTxMapping updated = map(write, topVer, cur, topLocked); + GridDistributedTxMapping updated = map(write, topVer, cur, topLocked, remap); if (cur != updated) { mappings.offer(updated); @@ -508,7 +509,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa IgniteTxEntry entry, AffinityTopologyVersion topVer, @Nullable GridDistributedTxMapping cur, - boolean topLocked + boolean topLocked, + boolean remap ) { GridCacheContext cacheCtx = entry.context(); @@ -542,7 +544,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); if (cacheCtx.isNear() || cacheCtx.isLocal()) { - if (entry.explicitVersion() == null) { + if (entry.explicitVersion() == null && !remap) { if (keyLockFut == null) { keyLockFut = new KeyLockFuture(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6247ac71/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java new file mode 100644 index 0000000..6941bcc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; +import javax.cache.Cache; +import javax.cache.CacheException; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionRollbackException; + +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class IgniteCacheNearRestartRollbackSelfTest extends GridCommonAbstractTest { + /** Shared IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** + * The number of entries to put to the test cache. + */ + private static final int ENTRY_COUNT = 100; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration(gridName)); + + if (getTestGridName(3).equals(gridName)) + cfg.setClientMode(true); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + return cfg; + } + + /** + * @param gridName Grid name. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration(String gridName) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + ccfg.setBackups(1); + + ccfg.setNearConfiguration(new NearCacheConfiguration<>()); + + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + public void testRestarts() throws Exception { + startGrids(4); + + Ignite tester = ignite(3); + + final AtomicLong lastUpdateTs = new AtomicLong(System.currentTimeMillis()); + + try { + Set<Integer> keys = new LinkedHashSet<>(); + + for (int i = 0; i < ENTRY_COUNT; i++) + keys.add(i); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < 50; i++) { + stopGrid(0); + + startGrid(0); + + stopGrid(1); + + startGrid(1); + + stopGrid(2); + + startGrid(2); + + synchronized (lastUpdateTs) { + while (System.currentTimeMillis() - lastUpdateTs.get() > 1_000) { + info("Will wait for an update operation to finish."); + + lastUpdateTs.wait(1_000); + } + } + } + + return null; + } + }); + + int currentValue = 0; + boolean invoke = false; + + while (!fut.isDone()) { + updateCache(tester, currentValue, invoke, false, keys); + + updateCache(tester, currentValue + 1, invoke, true, keys); + + invoke = !invoke; + currentValue++; + + synchronized (lastUpdateTs) { + lastUpdateTs.set(System.currentTimeMillis()); + + lastUpdateTs.notifyAll(); + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * Updates the cache or rollback the update. + * + * @param ignite Ignite instance to use. + * @param newValue the new value to put to the entries + * @param invoke whether to use invokeAll() or putAll() + * @param rollback whether to rollback the changes or commit + * @param keys Collection of keys to update. + */ + private void updateCache( + Ignite ignite, + int newValue, + boolean invoke, + boolean rollback, + Set<Integer> keys + ) { + final IgniteCache<Integer, Integer> cache = ignite.cache(null); + + if (rollback) { + while (true) { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + updateEntries(cache, newValue, invoke, keys); + + tx.rollback(); + + break; + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) { + ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); + + topEx.retryReadyFuture().get(); + } + else + throw e; + } + catch (ClusterTopologyException e) { + IgniteFuture<?> fut = e.retryReadyFuture(); + + fut.get(); + } + catch (TransactionRollbackException ignore) { + // Safe to retry right away. + } + } + } + else + updateEntries(cache, newValue, invoke, keys); + } + + /** + * Update the cache using either invokeAll() or putAll(). + * + * @param cache the cache + * @param newValue the new value to put to the entries + * @param invoke whether to use invokeAll() or putAll() + */ + private void updateEntries( + Cache<Integer, Integer> cache, + int newValue, + boolean invoke, + Set<Integer> keys + ) { + if (invoke) + cache.invokeAll(keys, new IntegerSetValue(newValue)); + else { + final Map<Integer, Integer> entries = new HashMap<>(ENTRY_COUNT); + + for (final Integer key : keys) + entries.put(key, newValue); + + cache.putAll(entries); + } + } + + /** + * {@link EntryProcessor} used to update the entry value. + */ + private static class IntegerSetValue implements EntryProcessor<Integer, Integer, Boolean>, Serializable { + /** */ + private final int newValue; + + /** + * @param newValue New value. + */ + private IntegerSetValue(final int newValue) { + this.newValue = newValue; + } + + /** {@inheritDoc} */ + @Override public Boolean process(MutableEntry<Integer, Integer> entry, Object... arguments) + throws EntryProcessorException { + entry.setValue(newValue); + + return Boolean.TRUE; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6247ac71/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java index a6bd785..9040ea5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.IgniteCacheCreateRestartSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheNearRestartRollbackSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOptimisticTxNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOptimisticTxNodeRestartTest; @@ -39,6 +40,7 @@ public class IgniteCacheRestartTestSuite extends TestSuite { suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class); suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class); suite.addTestSuite(GridCachePartitionedNearDisabledOptimisticTxNodeRestartTest.class); + suite.addTestSuite(IgniteCacheNearRestartRollbackSelfTest.class); suite.addTestSuite(IgniteCacheCreateRestartSelfTest.class);
