http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java index 151167a..1259f3e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java @@ -22,6 +22,8 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.cache.processor.EntryProcessor; @@ -41,6 +43,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -58,21 +61,16 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes private static final int GRID_CNT = 2; /** Number of increment iterations. */ - private static final int NUM_SETS = 50; + private static final int INCREMENTS = 100; + + /** */ + private static final int KEYS = 50; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - CacheConfiguration cache = new CacheConfiguration(); - - cache.setCacheMode(PARTITIONED); - cache.setAtomicityMode(atomicityMode()); - cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setBackups(1); - cache.setRebalanceMode(SYNC); - - cfg.setCacheConfiguration(cache); + cfg.setCacheConfiguration(cacheConfiguration()); TcpDiscoverySpi disco = new TcpDiscoverySpi(); @@ -90,6 +88,21 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes } /** + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration cache = new CacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setAtomicityMode(atomicityMode()); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setBackups(1); + cache.setRebalanceMode(SYNC); + + return cache; + } + + /** * @return Atomicity mode. */ protected CacheAtomicityMode atomicityMode() { @@ -121,6 +134,76 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes } /** + * @throws Exception If failed. + */ + public void testEntryProcessorNodeLeave() throws Exception { + startGrid(GRID_CNT); + + // TODO: IGNITE-1525 (test fails with one-phase commit). + boolean createCache = atomicityMode() == TRANSACTIONAL; + + String cacheName = null; + + if (createCache) { + CacheConfiguration ccfg = cacheConfiguration(); + + ccfg.setName("cache-2"); + ccfg.setBackups(2); + + ignite(0).createCache(ccfg); + + cacheName = ccfg.getName(); + } + + try { + int NODES = GRID_CNT + 1; + + final int RESTART_IDX = GRID_CNT + 1; + + for (int iter = 0; iter < 10; iter++) { + log.info("Iteration: " + iter); + + startGrid(RESTART_IDX); + + awaitPartitionMapExchange(); + + final CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + latch.await(); + + stopGrid(RESTART_IDX); + + return null; + } + }, "stop-thread"); + + int increments = checkIncrement(cacheName, iter % 2 == 2, fut, latch); + + assert increments >= INCREMENTS; + + fut.get(); + + for (int i = 0; i < KEYS; i++) { + for (int g = 0; g < NODES; g++) { + Set<String> vals = ignite(g).<String, Set<String>>cache(cacheName).get("set-" + i); + + assertNotNull(vals); + assertEquals(increments, vals.size()); + } + } + + ignite(0).cache(cacheName).removeAll(); + } + } + finally { + if (createCache) + ignite(0).destroyCache(cacheName); + } + } + + /** * @param invokeAll If {@code true} tests invokeAll operation. * @throws Exception If failed. */ @@ -146,7 +229,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes }, 1, "starter"); try { - checkIncrement(invokeAll); + checkIncrement(null, invokeAll, null, null); } finally { stop.set(true); @@ -154,12 +237,12 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes fut.get(getTestTimeout()); } - for (int i = 0; i < NUM_SETS; i++) { + for (int i = 0; i < KEYS; i++) { for (int g = 0; g < GRID_CNT + started; g++) { Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i); assertNotNull(vals); - assertEquals(100, vals.size()); + assertEquals(INCREMENTS, vals.size()); } } } @@ -170,17 +253,29 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes } /** + * @param cacheName Cache name. * @param invokeAll If {@code true} tests invokeAll operation. + * @param fut If not null then executes updates while future is not done. + * @param latch Latch to count down when first update is done. * @throws Exception If failed. + * @return Number of increments. */ - private void checkIncrement(boolean invokeAll) throws Exception { - for (int k = 0; k < 100; k++) { + private int checkIncrement( + String cacheName, + boolean invokeAll, + @Nullable IgniteInternalFuture<?> fut, + @Nullable CountDownLatch latch) throws Exception { + int increments = 0; + + for (int k = 0; k < INCREMENTS || (fut != null && !fut.isDone()); k++) { + increments++; + if (invokeAll) { - IgniteCache<String, Set<String>> cache = ignite(0).cache(null); + IgniteCache<String, Set<String>> cache = ignite(0).cache(cacheName); Map<String, Processor> procs = new LinkedHashMap<>(); - for (int i = 0; i < NUM_SETS; i++) { + for (int i = 0; i < KEYS; i++) { String key = "set-" + i; String val = "value-" + k; @@ -198,19 +293,31 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes } } else { - IgniteCache<String, Set<String>> cache = ignite(0).cache(null); + IgniteCache<String, Set<String>> cache = ignite(0).cache(cacheName); - for (int i = 0; i < NUM_SETS; i++) { + for (int i = 0; i < KEYS; i++) { String key = "set-" + i; String val = "value-" + k; Integer valsCnt = cache.invoke(key, new Processor(val)); - assertEquals(k + 1, (Object)valsCnt); + Integer exp = k + 1; + + if (!exp.equals(valsCnt)) + log.info("Unexpected return value [valsCnt=" + valsCnt + + ", exp=" + exp + + ", cacheVal=" + cache.get(key) + ']'); + + assertEquals(exp, valsCnt); } } + + if (latch != null && k == 0) + latch.countDown(); } + + return increments; } /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheIncrementTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheIncrementTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheIncrementTxTest.java new file mode 100644 index 0000000..40854e4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheIncrementTxTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class IgniteCacheIncrementTxTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + if (getTestGridName(SRVS).equals(gridName)) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + startGrid(SRVS); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testIncrementTxTopologyChange0() throws Exception { + nodeJoin(cacheConfiguration(0)); + } + + /** + * @throws Exception If failed. + */ + public void testIncrementTxTopologyChange1() throws Exception { + nodeJoin(cacheConfiguration(1)); + } + + /** + * @throws Exception If failed. + */ + public void testIncrementTxTopologyChange2() throws Exception { + nodeJoin(cacheConfiguration(2)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void nodeJoin(CacheConfiguration ccfg) throws Exception { + ignite(0).createCache(ccfg); + + try { + final Map<Integer, AtomicInteger> incMap = new LinkedHashMap<>(); + + final int KEYS = 10; + + for (int i = 0; i < KEYS; i++) + incMap.put(i, new AtomicInteger()); + + final int NODES = SRVS + 1; + + final int START_NODES = 5; + + final AtomicInteger nodeIdx = new AtomicInteger(NODES); + + final IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int node = nodeIdx.getAndIncrement(); + + Ignite ignite = startGrid(node); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + for (int i = 0; i < 1000; i++) + incrementTx(ignite, cache, incMap); + + return null; + } + }, START_NODES, "start-thread"); + + IgniteInternalFuture<?> txFut = updateFuture(NODES, incMap, fut); + + fut.get(); + txFut.get(); + + log.info("First updates: " + incMap); + + checkCache(NODES + START_NODES, incMap); + + if (ccfg.getBackups() > 0) { + for (int i = 0; i < START_NODES; i++) { + final int stopIdx = NODES + i; + + IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + U.sleep(500); + + stopGrid(stopIdx); + + return null; + } + }, "stop-thread"); + + txFut = updateFuture(NODES, incMap, stopFut); + + stopFut.get(); + txFut.get(); + + checkCache(NODES + START_NODES - (i + 1), incMap); + + for (int n = 0; n < SRVS; n++) + ignite(n).cache(null).rebalance().get(); + } + } + else { + for (int i = 0; i < START_NODES; i++) + stopGrid(NODES + i); + + return; + } + + log.info("Second updates: " + incMap); + + checkCache(NODES, incMap); + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param expNodes Expected nodes number. + * @param incMap Increments map. + */ + private void checkCache(int expNodes, Map<Integer, AtomicInteger> incMap) { + List<Ignite> nodes = G.allGrids(); + + assertEquals(expNodes, nodes.size()); + + for (Ignite node : nodes) { + IgniteCache<Integer, Integer> cache = node.cache(null); + + for (Map.Entry<Integer, AtomicInteger> e : incMap.entrySet()) + assertEquals((Integer)e.getValue().get(), cache.get(e.getKey())); + } + } + + /** + * @param nodes Number of nodes. + * @param incMap Increments map. + * @param fut Future to wait for. + * @return Future. + */ + private IgniteInternalFuture<?> updateFuture(final int nodes, + final Map<Integer, AtomicInteger> incMap, + final IgniteInternalFuture<?> fut) { + final AtomicInteger threadIdx = new AtomicInteger(0); + + return GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int node = threadIdx.incrementAndGet() % nodes; + + Ignite ignite = grid(node); + + Thread.currentThread().setName("update-" + ignite.name()); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + while (!fut.isDone()) + incrementTx(ignite, cache, incMap); + + for (int i = 0; i < 50; i++) + incrementTx(ignite, cache, incMap); + + return null; + } + }, nodes * 3, "update-thread"); + } + + /** + * @param ignite Node. + * @param cache Cache. + * @param incMap Increments map. + */ + private void incrementTx(Ignite ignite, IgniteCache<Integer, Integer> cache, Map<Integer, AtomicInteger> incMap) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + boolean singleKey = rnd.nextBoolean(); + + List<Integer> keys = new ArrayList<>(incMap.size()); + + try { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (Integer key : incMap.keySet()) { + if (singleKey || rnd.nextBoolean()) { + Integer val = cache.get(key); + + if (val == null) + val = 1; + else + val = val + 1; + + cache.put(key, val); + + keys.add(key); + } + + if (singleKey) + break; + } + + tx.commit(); + + for (Integer key : keys) + incMap.get(key).incrementAndGet(); + } + } + catch (Exception e) { + log.info("Tx failed: " + e); + } + } + + /** + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(int backups) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setBackups(backups); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java index 9c1abc7..f9608e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java @@ -83,7 +83,6 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes } if (getTestGridName(10).equals(gridName)) { - CacheConfiguration cc = cfg.getCacheConfiguration()[0]; cc.setRebalanceDelay(-1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java index 73388fb..40b2cfd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java @@ -18,9 +18,12 @@ package org.apache.ignite.internal.processors.cache; import java.io.IOException; +import java.util.List; import javax.cache.CacheException; import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Checks behavior on exception while unmarshalling key. @@ -43,11 +46,11 @@ public class IgniteCacheP2pUnmarshallingRebalanceErrorTest extends IgniteCacheP2 startGrid(3); - //GridDhtPartitionSupplyMessage unmarshalling failed but ioManager does not hangs up. + // GridDhtPartitionSupplyMessage unmarshalling failed but ioManager does not hangs up. Thread.sleep(1000); - //GridDhtForceKeysRequest unmarshalling failed test. + // GridDhtForceKeysRequest unmarshalling failed test. stopGrid(3); readCnt.set(Integer.MAX_VALUE); @@ -55,13 +58,38 @@ public class IgniteCacheP2pUnmarshallingRebalanceErrorTest extends IgniteCacheP2 for (int i = 0; i <= 100; i++) jcache(0).put(new TestKey(String.valueOf(++key)), ""); - startGrid(10); //custom rebalanceDelay set at cfg. + startGrid(10); // Custom rebalanceDelay set at cfg. Affinity<Object> aff = affinity(grid(10).cache(null)); - while (!aff.isPrimary(grid(10).localNode(), new TestKey(String.valueOf(key)))) + GridCacheContext cctx = grid(10).context().cache().cache(null).context(); + + List<List<ClusterNode>> affAssign = + cctx.affinity().assignment(cctx.affinity().affinityTopologyVersion()).idealAssignment(); + + Integer part = null; + + ClusterNode node = grid(10).localNode(); + + for (int p = 0; p < aff.partitions(); p++) { + if (affAssign.get(p).get(0).equals(node)) { + part = p; + + break; + } + } + + assertNotNull(part); + + long stopTime = U.currentTimeMillis() + 5000; + + while (!part.equals(aff.partition(new TestKey(String.valueOf(key))))) { --key; + if (U.currentTimeMillis() > stopTime) + fail(); + } + readCnt.set(1); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java index 4ea8f91..e2d3169 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingTxErrorTest.java @@ -43,6 +43,8 @@ public class IgniteCacheP2pUnmarshallingTxErrorTest extends IgniteCacheP2pUnmars @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + cfg.setLateAffinityAssignment(false); + if (!gridName.endsWith("0")) cfg.getCacheConfiguration()[0].setRebalanceDelay(-1); // Allows to check GridDhtLockRequest fail. http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientAffinityAssignmentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientAffinityAssignmentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientAffinityAssignmentSelfTest.java index 9280c04..d782c9f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientAffinityAssignmentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientAffinityAssignmentSelfTest.java @@ -103,7 +103,7 @@ public class IgniteClientAffinityAssignmentSelfTest extends GridCommonAbstractTe private void checkAffinityFunction() throws Exception { cache = true; - startGrids(3); + startGridsMultiThreaded(3, true); long topVer = 3; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java index a208b07..084be02 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java @@ -67,9 +67,11 @@ public class IgniteDynamicCacheStartStopConcurrentTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testConcurrentStartStop() throws Exception { - checkTopologyVersion(new AffinityTopologyVersion(NODES, 0)); + awaitPartitionMapExchange(); - int minorVer = 0; + int minorVer = ignite(0).configuration().isLateAffinityAssignment() ? 1 : 0; + + checkTopologyVersion(new AffinityTopologyVersion(NODES, minorVer)); for (int i = 0; i < 5; i++) { log.info("Iteration: " + i); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java index cfa1244..9aa0b86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java @@ -99,7 +99,7 @@ public abstract class IgniteTxReentryAbstractSelfTest extends GridCommonAbstract /** @throws Exception If failed. */ public void testLockReentry() throws Exception { - startGrids(gridCount()); + startGridsMultiThreaded(gridCount(), true); try { IgniteCache<Object, Object> cache = grid(0).cache(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index fbd72bf..74023e9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -969,6 +969,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @return Future. */ IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) { + final AtomicInteger nodeIdx = new AtomicInteger(G.allGrids().size()); + return GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { @@ -976,17 +978,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig if (failed.get()) return; - String name = UUID.randomUUID().toString(); + int idx = nodeIdx.getAndIncrement(); + + Thread.currentThread().setName("thread-" + getTestGridName(idx)); try { - log.info("Start node: " + name); + log.info("Start node: " + getTestGridName(idx)); - Ignite g = startGrid(name); + Ignite g = startGrid(idx); cb.apply(g); } finally { - stopGrid(name); + stopGrid(idx); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java index 03502d3..8fcda3f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java @@ -219,6 +219,8 @@ public class CacheGetInsideLockChangingTopologyTest extends GridCommonAbstractTe Ignite srv = startGrid(NEW_NODE); + awaitPartitionMapExchange(); + try { Integer key = primaryKey(srv.cache(cacheName)); @@ -268,6 +270,8 @@ public class CacheGetInsideLockChangingTopologyTest extends GridCommonAbstractTe Ignite srv = startGrid(NEW_NODE); + awaitPartitionMapExchange(); + try { Integer key = primaryKey(srv.cache(cacheName)); @@ -320,6 +324,8 @@ public class CacheGetInsideLockChangingTopologyTest extends GridCommonAbstractTe Ignite srv = startGrid(NEW_NODE); + awaitPartitionMapExchange(); + try { Integer key = primaryKey(srv.cache(cacheName)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentFairAffinityTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentFairAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentFairAffinityTest.java new file mode 100644 index 0000000..0ab2314 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentFairAffinityTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CacheLateAffinityAssignmentFairAffinityTest extends CacheLateAffinityAssignmentTest { + /** {@inheritDoc} */ + @Override protected AffinityFunction affinityFunction(@Nullable Integer parts) { + return new FairAffinityFunction(false, parts == null ? FairAffinityFunction.DFLT_PART_CNT : parts); + } +}
