[ https://issues.apache.org/jira/browse/IGNITE-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Konstantin Dudkov updated IGNITE-4982: -------------------------------------- Description: GridCacheAbstractRemoveFailureTest (and some child tests) fails. Reproducer: {code:java} import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * Tests that removes are not lost when topology changes. */ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstractTest { /** IP finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ private static final int GRID_CNT = 3; /** Keys count. */ private static final int KEYS_CNT = 10_000; /** Test duration. */ private static final long DUR = 90 * 1000L; /** Cache data assert frequency. */ private static final long ASSERT_FREQ = 10_000; /** Kill delay. */ private static final T2<Integer, Integer> KILL_DELAY = new T2<>(2000, 5000); /** Start delay. */ private static final T2<Integer, Integer> START_DELAY = new T2<>(2000, 5000); /** */ private static String sizePropVal; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER).setForceServerMode(true); if (testClientNode() && getTestIgniteInstanceName(0).equals(igniteInstanceName)) cfg.setClientMode(true); ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); return cfg; } /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { // Need to increase value set in GridAbstractTest sizePropVal = System.getProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE); System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "100000"); startGrids(GRID_CNT); } /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { super.afterTestsStopped(); System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, sizePropVal != null ? sizePropVal : ""); stopAllGrids(); } /** {@inheritDoc} */ @Override protected long getTestTimeout() { return DUR + 60_000; } /** * @return Cache mode. */ protected abstract CacheMode cacheMode(); /** * @return Cache atomicity mode. */ protected abstract CacheAtomicityMode atomicityMode(); /** * @return Near cache configuration. */ protected abstract NearCacheConfiguration nearCache(); /** * @return Atomic cache write order mode. */ protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { return null; } /** * @return {@code True} if test updates from client node. */ protected boolean testClientNode() { return false; } /** * @throws Exception If failed. */ public void testPutAndRemove() throws Exception { putAndRemove(DUR, null, null); } /** * @throws Exception If failed. */ public void testPutAndRemovePessimisticTx() throws Exception { if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL) return; putAndRemove(30_000, PESSIMISTIC, REPEATABLE_READ); } /** * @throws Exception If failed. */ public void testPutAndRemoveOptimisticSerializableTx() throws Exception { if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL) return; putAndRemove(30_000, OPTIMISTIC, SERIALIZABLE); } /** * @param duration Test duration. * @param txConcurrency Transaction concurrency if test explicit transaction. * @param txIsolation Transaction isolation if test explicit transaction. * @throws Exception If failed. */ private void putAndRemove(long duration, final TransactionConcurrency txConcurrency, final TransactionIsolation txIsolation) throws Exception { assertEquals(testClientNode(), (boolean) grid(0).configuration().isClientMode()); grid(0).destroyCache(null); CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setCacheMode(cacheMode()); if (cacheMode() == PARTITIONED) ccfg.setBackups(1); ccfg.setAtomicityMode(atomicityMode()); ccfg.setAtomicWriteOrderMode(atomicWriteOrderMode()); ccfg.setNearConfiguration(nearCache()); final IgniteCache<Integer, Integer> sndCache0 = grid(0).createCache(ccfg); final AtomicBoolean stop = new AtomicBoolean(); final AtomicLong cntr = new AtomicLong(); final AtomicLong errCntr = new AtomicLong(); // Expected values in cache. final Map<Integer, GridTuple<Integer>> expVals = new ConcurrentHashMap8<>(); final AtomicReference<CyclicBarrier> cmp = new AtomicReference<>(); IgniteInternalFuture<?> updateFut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { Thread.currentThread().setName("update-thread"); ThreadLocalRandom rnd = ThreadLocalRandom.current(); IgniteTransactions txs = sndCache0.unwrap(Ignite.class).transactions(); while (!stop.get()) { for (int i = 0; i < 100; i++) { int key = rnd.nextInt(KEYS_CNT); boolean put = rnd.nextInt(0, 100) > 10; while (true) { try { if (put) { boolean failed = false; if (txConcurrency != null) { try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { sndCache0.put(key, i); tx.commit(); } catch (CacheException | IgniteException e) { if (!X.hasCause(e, ClusterTopologyCheckedException.class)) { log.error("Unexpected error: " + e); throw e; } failed = true; } } else sndCache0.put(key, i); if (!failed) expVals.put(key, F.t(i)); } else { boolean failed = false; if (txConcurrency != null) { try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { sndCache0.remove(key); tx.commit(); } catch (CacheException | IgniteException e) { if (!X.hasCause(e, ClusterTopologyCheckedException.class)) { log.error("Unexpected error: " + e); throw e; } failed = true; } } else sndCache0.remove(key); if (!failed) expVals.put(key, F.<Integer>t(null)); } break; } catch (CacheException e) { if (put) log.error("Put failed [key=" + key + ", val=" + i + ']', e); else log.error("Remove failed [key=" + key + ']', e); errCntr.incrementAndGet(); } } } cntr.addAndGet(100); CyclicBarrier barrier = cmp.get(); if (barrier != null) { log.info("Wait data check."); barrier.await(60_000, TimeUnit.MILLISECONDS); log.info("Finished wait data check."); } } return null; } }); IgniteInternalFuture<?> killFut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { Thread.currentThread().setName("restart-thread"); while (!stop.get()) { U.sleep(random(KILL_DELAY.get1(), KILL_DELAY.get2())); killAndRestart(stop); CyclicBarrier barrier = cmp.get(); if (barrier != null) { log.info("Wait data check."); barrier.await(60_000, TimeUnit.MILLISECONDS); log.info("Finished wait data check."); } } return null; } }); try { long stopTime = duration + U.currentTimeMillis() ; long nextAssert = U.currentTimeMillis() + ASSERT_FREQ; while (U.currentTimeMillis() < stopTime) { long start = System.nanoTime(); long ops = cntr.longValue(); U.sleep(1000); long diff = cntr.longValue() - ops; double time = (System.nanoTime() - start) / 1_000_000_000d; long opsPerSecond = (long)(diff / time); log.info("Operations/second: " + opsPerSecond); if (U.currentTimeMillis() >= nextAssert) { CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { @Override public void run() { try { cmp.set(null); log.info("Checking cache content."); assertCacheContent(expVals); log.info("Finished check cache content."); } catch (Throwable e) { log.error("Unexpected error: " + e, e); throw e; } } }); log.info("Start cache content check."); cmp.set(barrier); try { barrier.await(60_000, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { U.dumpThreads(log); fail("Failed to check cache content: " + e); } log.info("Cache content check done."); nextAssert = System.currentTimeMillis() + ASSERT_FREQ; } } } finally { stop.set(true); } killFut.get(); updateFut.get(); log.info("Test finished. Update errors: " + errCntr.get()); } /** * @param stop Stop flag. * @throws Exception If failed. */ private void killAndRestart(AtomicBoolean stop) throws Exception { if (stop.get()) return; int idx = random(1, GRID_CNT + 1); log.info("Killing node " + idx); stopGrid(idx); U.sleep(random(START_DELAY.get1(), START_DELAY.get2())); log.info("Restarting node " + idx); startGrid(idx); if (stop.get()) return; U.sleep(1000); } /** * @param expVals Expected values in cache. */ @SuppressWarnings({"TooBroadScope", "ConstantIfStatement"}) private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) { assert !expVals.isEmpty(); Collection<Integer> failedKeys = new HashSet<>(); for (int i = 0; i < GRID_CNT; i++) { Ignite ignite = grid(i); IgniteCache<Integer, Integer> cache = ignite.cache(null); for (Map.Entry<Integer, GridTuple<Integer>> expVal : expVals.entrySet()) { Integer val = cache.get(expVal.getKey()); if (!F.eq(expVal.getValue().get(), val)) { failedKeys.add(expVal.getKey()); boolean primary = affinity(cache).isPrimary(ignite.cluster().localNode(), expVal.getKey()); boolean backup = affinity(cache).isBackup(ignite.cluster().localNode(), expVal.getKey()); log.error("Unexpected cache data [exp=" + expVal + ", actual=" + val + ", nodePrimary=" + primary + ", nodeBackup=" + backup + ", nodeIdx" + i + ", nodeId=" + ignite.cluster().localNode().id() + ']'); } } } assertTrue("Unexpected data for keys: " + failedKeys, failedKeys.isEmpty()); } /** * @param min Min possible value. * @param max Max possible value (exclusive). * @return Random value. */ private static int random(int min, int max) { if (max == min) return max; return ThreadLocalRandom.current().nextInt(min, max); } } {code} > GridCacheAbstractRemoveFailureTest fail > --------------------------------------- > > Key: IGNITE-4982 > URL: https://issues.apache.org/jira/browse/IGNITE-4982 > Project: Ignite > Issue Type: Bug > Components: cache > Reporter: Konstantin Dudkov > Assignee: Konstantin Dudkov > Fix For: 2.0 > > > GridCacheAbstractRemoveFailureTest (and some child tests) fails. Reproducer: > {code:java} > import java.util.Collection; > import java.util.HashSet; > import java.util.Map; > import java.util.concurrent.Callable; > import java.util.concurrent.CyclicBarrier; > import java.util.concurrent.ThreadLocalRandom; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.TimeoutException; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.atomic.AtomicLong; > import java.util.concurrent.atomic.AtomicReference; > import javax.cache.CacheException; > import org.apache.ignite.Ignite; > import org.apache.ignite.IgniteCache; > import org.apache.ignite.IgniteException; > import org.apache.ignite.IgniteTransactions; > import org.apache.ignite.cache.CacheAtomicWriteOrderMode; > import org.apache.ignite.cache.CacheAtomicityMode; > import org.apache.ignite.cache.CacheMode; > import org.apache.ignite.configuration.CacheConfiguration; > import org.apache.ignite.configuration.IgniteConfiguration; > import org.apache.ignite.configuration.NearCacheConfiguration; > import org.apache.ignite.internal.IgniteInternalFuture; > import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; > import org.apache.ignite.internal.util.lang.GridTuple; > import org.apache.ignite.internal.util.typedef.F; > import org.apache.ignite.internal.util.typedef.T2; > import org.apache.ignite.internal.util.typedef.X; > import org.apache.ignite.internal.util.typedef.internal.U; > import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; > import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; > import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; > import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; > import org.apache.ignite.testframework.GridTestUtils; > import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; > import org.apache.ignite.transactions.Transaction; > import org.apache.ignite.transactions.TransactionConcurrency; > import org.apache.ignite.transactions.TransactionIsolation; > import org.jsr166.ConcurrentHashMap8; > import static > org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; > import static org.apache.ignite.cache.CacheMode.PARTITIONED; > import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; > import static > org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; > import static > org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; > import static > org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; > import static > org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; > /** > * Tests that removes are not lost when topology changes. > */ > public abstract class GridCacheAbstractRemoveFailureTest extends > GridCommonAbstractTest { > /** IP finder. */ > private static final TcpDiscoveryIpFinder IP_FINDER = new > TcpDiscoveryVmIpFinder(true); > /** */ > private static final int GRID_CNT = 3; > /** Keys count. */ > private static final int KEYS_CNT = 10_000; > /** Test duration. */ > private static final long DUR = 90 * 1000L; > /** Cache data assert frequency. */ > private static final long ASSERT_FREQ = 10_000; > /** Kill delay. */ > private static final T2<Integer, Integer> KILL_DELAY = new T2<>(2000, > 5000); > /** Start delay. */ > private static final T2<Integer, Integer> START_DELAY = new T2<>(2000, > 5000); > /** */ > private static String sizePropVal; > /** {@inheritDoc} */ > @Override protected IgniteConfiguration getConfiguration(String > igniteInstanceName) throws Exception { > IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); > > ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER).setForceServerMode(true); > if (testClientNode() && > getTestIgniteInstanceName(0).equals(igniteInstanceName)) > cfg.setClientMode(true); > > ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); > return cfg; > } > /** {@inheritDoc} */ > @Override protected void beforeTestsStarted() throws Exception { > // Need to increase value set in GridAbstractTest > sizePropVal = > System.getProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE); > System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "100000"); > startGrids(GRID_CNT); > } > /** {@inheritDoc} */ > @Override protected void afterTestsStopped() throws Exception { > super.afterTestsStopped(); > System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, > sizePropVal != null ? sizePropVal : ""); > stopAllGrids(); > } > /** {@inheritDoc} */ > @Override protected long getTestTimeout() { > return DUR + 60_000; > } > /** > * @return Cache mode. > */ > protected abstract CacheMode cacheMode(); > /** > * @return Cache atomicity mode. > */ > protected abstract CacheAtomicityMode atomicityMode(); > /** > * @return Near cache configuration. > */ > protected abstract NearCacheConfiguration nearCache(); > /** > * @return Atomic cache write order mode. > */ > protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { > return null; > } > /** > * @return {@code True} if test updates from client node. > */ > protected boolean testClientNode() { > return false; > } > /** > * @throws Exception If failed. > */ > public void testPutAndRemove() throws Exception { > putAndRemove(DUR, null, null); > } > /** > * @throws Exception If failed. > */ > public void testPutAndRemovePessimisticTx() throws Exception { > if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL) > return; > putAndRemove(30_000, PESSIMISTIC, REPEATABLE_READ); > } > /** > * @throws Exception If failed. > */ > public void testPutAndRemoveOptimisticSerializableTx() throws Exception { > if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL) > return; > putAndRemove(30_000, OPTIMISTIC, SERIALIZABLE); > } > /** > * @param duration Test duration. > * @param txConcurrency Transaction concurrency if test explicit > transaction. > * @param txIsolation Transaction isolation if test explicit transaction. > * @throws Exception If failed. > */ > private void putAndRemove(long duration, > final TransactionConcurrency txConcurrency, > final TransactionIsolation txIsolation) throws Exception { > assertEquals(testClientNode(), (boolean) > grid(0).configuration().isClientMode()); > grid(0).destroyCache(null); > CacheConfiguration<Integer, Integer> ccfg = new > CacheConfiguration<>(); > ccfg.setWriteSynchronizationMode(FULL_SYNC); > ccfg.setCacheMode(cacheMode()); > if (cacheMode() == PARTITIONED) > ccfg.setBackups(1); > ccfg.setAtomicityMode(atomicityMode()); > ccfg.setAtomicWriteOrderMode(atomicWriteOrderMode()); > ccfg.setNearConfiguration(nearCache()); > final IgniteCache<Integer, Integer> sndCache0 = > grid(0).createCache(ccfg); > final AtomicBoolean stop = new AtomicBoolean(); > final AtomicLong cntr = new AtomicLong(); > final AtomicLong errCntr = new AtomicLong(); > // Expected values in cache. > final Map<Integer, GridTuple<Integer>> expVals = new > ConcurrentHashMap8<>(); > final AtomicReference<CyclicBarrier> cmp = new AtomicReference<>(); > IgniteInternalFuture<?> updateFut = GridTestUtils.runAsync(new > Callable<Void>() { > @Override public Void call() throws Exception { > Thread.currentThread().setName("update-thread"); > ThreadLocalRandom rnd = ThreadLocalRandom.current(); > IgniteTransactions txs = > sndCache0.unwrap(Ignite.class).transactions(); > while (!stop.get()) { > for (int i = 0; i < 100; i++) { > int key = rnd.nextInt(KEYS_CNT); > boolean put = rnd.nextInt(0, 100) > 10; > while (true) { > try { > if (put) { > boolean failed = false; > if (txConcurrency != null) { > try (Transaction tx = > txs.txStart(txConcurrency, txIsolation)) { > sndCache0.put(key, i); > tx.commit(); > } > catch (CacheException | > IgniteException e) { > if (!X.hasCause(e, > ClusterTopologyCheckedException.class)) { > log.error("Unexpected error: > " + e); > throw e; > } > failed = true; > } > } > else > sndCache0.put(key, i); > if (!failed) > expVals.put(key, F.t(i)); > } > else { > boolean failed = false; > if (txConcurrency != null) { > try (Transaction tx = > txs.txStart(txConcurrency, txIsolation)) { > sndCache0.remove(key); > tx.commit(); > } > catch (CacheException | > IgniteException e) { > if (!X.hasCause(e, > ClusterTopologyCheckedException.class)) { > log.error("Unexpected error: > " + e); > throw e; > } > failed = true; > } > } > else > sndCache0.remove(key); > if (!failed) > expVals.put(key, F.<Integer>t(null)); > } > break; > } > catch (CacheException e) { > if (put) > log.error("Put failed [key=" + key + ", > val=" + i + ']', e); > else > log.error("Remove failed [key=" + key + > ']', e); > errCntr.incrementAndGet(); > } > } > } > cntr.addAndGet(100); > CyclicBarrier barrier = cmp.get(); > if (barrier != null) { > log.info("Wait data check."); > barrier.await(60_000, TimeUnit.MILLISECONDS); > log.info("Finished wait data check."); > } > } > return null; > } > }); > IgniteInternalFuture<?> killFut = GridTestUtils.runAsync(new > Callable<Void>() { > @Override public Void call() throws Exception { > Thread.currentThread().setName("restart-thread"); > while (!stop.get()) { > U.sleep(random(KILL_DELAY.get1(), KILL_DELAY.get2())); > killAndRestart(stop); > CyclicBarrier barrier = cmp.get(); > if (barrier != null) { > log.info("Wait data check."); > barrier.await(60_000, TimeUnit.MILLISECONDS); > log.info("Finished wait data check."); > } > } > return null; > } > }); > try { > long stopTime = duration + U.currentTimeMillis() ; > long nextAssert = U.currentTimeMillis() + ASSERT_FREQ; > while (U.currentTimeMillis() < stopTime) { > long start = System.nanoTime(); > long ops = cntr.longValue(); > U.sleep(1000); > long diff = cntr.longValue() - ops; > double time = (System.nanoTime() - start) / 1_000_000_000d; > long opsPerSecond = (long)(diff / time); > log.info("Operations/second: " + opsPerSecond); > if (U.currentTimeMillis() >= nextAssert) { > CyclicBarrier barrier = new CyclicBarrier(3, new > Runnable() { > @Override public void run() { > try { > cmp.set(null); > log.info("Checking cache content."); > assertCacheContent(expVals); > log.info("Finished check cache content."); > } > catch (Throwable e) { > log.error("Unexpected error: " + e, e); > throw e; > } > } > }); > log.info("Start cache content check."); > cmp.set(barrier); > try { > barrier.await(60_000, TimeUnit.MILLISECONDS); > } > catch (TimeoutException e) { > U.dumpThreads(log); > fail("Failed to check cache content: " + e); > } > log.info("Cache content check done."); > nextAssert = System.currentTimeMillis() + ASSERT_FREQ; > } > } > } > finally { > stop.set(true); > } > killFut.get(); > updateFut.get(); > log.info("Test finished. Update errors: " + errCntr.get()); > } > /** > * @param stop Stop flag. > * @throws Exception If failed. > */ > private void killAndRestart(AtomicBoolean stop) throws Exception { > if (stop.get()) > return; > int idx = random(1, GRID_CNT + 1); > log.info("Killing node " + idx); > stopGrid(idx); > U.sleep(random(START_DELAY.get1(), START_DELAY.get2())); > log.info("Restarting node " + idx); > startGrid(idx); > if (stop.get()) > return; > U.sleep(1000); > } > /** > * @param expVals Expected values in cache. > */ > @SuppressWarnings({"TooBroadScope", "ConstantIfStatement"}) > private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) > { > assert !expVals.isEmpty(); > Collection<Integer> failedKeys = new HashSet<>(); > for (int i = 0; i < GRID_CNT; i++) { > Ignite ignite = grid(i); > IgniteCache<Integer, Integer> cache = ignite.cache(null); > for (Map.Entry<Integer, GridTuple<Integer>> expVal : > expVals.entrySet()) { > Integer val = cache.get(expVal.getKey()); > if (!F.eq(expVal.getValue().get(), val)) { > failedKeys.add(expVal.getKey()); > boolean primary = > affinity(cache).isPrimary(ignite.cluster().localNode(), expVal.getKey()); > boolean backup = > affinity(cache).isBackup(ignite.cluster().localNode(), expVal.getKey()); > log.error("Unexpected cache data [exp=" + expVal + > ", actual=" + val + > ", nodePrimary=" + primary + > ", nodeBackup=" + backup + > ", nodeIdx" + i + > ", nodeId=" + ignite.cluster().localNode().id() + > ']'); > } > } > } > assertTrue("Unexpected data for keys: " + failedKeys, > failedKeys.isEmpty()); > } > /** > * @param min Min possible value. > * @param max Max possible value (exclusive). > * @return Random value. > */ > private static int random(int min, int max) { > if (max == min) > return max; > return ThreadLocalRandom.current().nextInt(min, max); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)