IGNITE-2539 issues reproduce test
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6485ae58 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6485ae58 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6485ae58 Branch: refs/heads/ignite-3727-2 Commit: 6485ae58c1f6dafdbfcf4de4f258f1386affa723 Parents: 784958b Author: DmitriyGovorukhin <[email protected]> Authored: Thu Sep 8 14:55:09 2016 +0300 Committer: DmitriyGovorukhin <[email protected]> Committed: Thu Sep 8 14:55:09 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/IgniteUtils.java | 41 ++++ .../IssuesIGNITE2539ReproduceTest.java | 227 +++++++++++++++++++ 2 files changed, 268 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6485ae58/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index e3389d5..9fd4122 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -1244,6 +1244,47 @@ public abstract class IgniteUtils { } /** + * Performs thread dump and return all available info. + */ + public static GridStringBuilder dumpThreads() { + ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); + + GridStringBuilder sb = new GridStringBuilder("Thread dump at ") + .a(new SimpleDateFormat("yyyy/MM/dd HH:mm:ss z").format(new Date(U.currentTimeMillis()))).a(NL); + + final Set<Long> deadlockedThreadsIds = getDeadlockedThreadIds(mxBean); + + sb.a(NL); + + if (deadlockedThreadsIds.isEmpty()) + sb.a("No deadlocked threads detected."); + else + sb.a("Deadlocked threads detected (see thread dump below) " + + "[deadlockedThreadsCnt=" + deadlockedThreadsIds.size() + ']'); + + sb.a(NL); + + ThreadInfo[] threadInfos = + mxBean.dumpAllThreads(mxBean.isObjectMonitorUsageSupported(), mxBean.isSynchronizerUsageSupported()); + + for (ThreadInfo info : threadInfos) { + printThreadInfo(info, sb, deadlockedThreadsIds); + + sb.a(NL); + + if (info.getLockedSynchronizers() != null && info.getLockedSynchronizers().length > 0) { + printSynchronizersInfo(info.getLockedSynchronizers(), sb); + + sb.a(NL); + } + } + + sb.a(NL); + + return sb; + } + + /** * Get deadlocks from the thread bean. * @param mxBean the bean * @return the set of deadlocked threads (may be empty Set, but never null). http://git-wip-us.apache.org/repos/asf/ignite/blob/6485ae58/modules/core/src/test/java/org/apache/ignite/cache/affinity/bugreproduce/IssuesIGNITE2539ReproduceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/bugreproduce/IssuesIGNITE2539ReproduceTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/bugreproduce/IssuesIGNITE2539ReproduceTest.java new file mode 100644 index 0000000..c39ebf0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/bugreproduce/IssuesIGNITE2539ReproduceTest.java @@ -0,0 +1,227 @@ +package org.apache.ignite.cache.affinity.bugreproduce; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.rendezvous.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Created by dgovorukhin on 01.09.2016. + */ +public class IssuesIGNITE2539ReproduceTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int ITERATIONS = 10; + + /** partitioned cache name. */ + private static final String CACHE_NAME_DHT_PARTITIONED = "cacheP"; + + /** replicated cache name. */ + private static final String CACHE_NAME_DHT_REPLICATED = "cacheR"; + + /** Ignite. */ + private static Ignite ignite1; + + /** Ignite. */ + private static Ignite ignite2; + + /** Ignite. */ + private static Ignite ignite3; + + private static final AtomicReference<Throwable> exc = new AtomicReference<>(); + + private static final AtomicReference<GridStringBuilder> dump = new AtomicReference<>(); + + private static final AtomicReference<Thread> thr = new AtomicReference<>(); + + /** + * @return Affinity function to test. + */ + private AffinityFunction affinityFunction() { + return new RendezvousAffinityFunction(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite1 = startGrid(0); + ignite2 = startGrid(1); + ignite3 = startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); + iCfg.setRebalanceThreadPoolSize(2); + + return iCfg; + } + + /** + * + */ + public void testCacheStopping() throws Exception { + + final int delta = 5; + + for (Thread worker : getExchangeWorkerThread()) { + worker.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread t, Throwable e) { + dump.compareAndSet(null, U.dumpThreads()); + exc.compareAndSet(null, e); + thr.compareAndSet(null, t); + } + }); + } + + GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + + for (int start = 0; Ignition.allGrids().contains(ignite1); start += delta) { + fillWithCache(ignite2, delta, start, affinityFunction()); + + for (String victim : ignite2.cacheNames()) + ignite2.getOrCreateCache(victim).put(start, delta); + + for (String victim : ignite1.cacheNames()) + ignite1.destroyCache(victim); + } + return null; + } + }, "CacheSerialKiller"); + + for (int i = delta; i < ITERATIONS + delta; i++) + startGrid(i); + + U.sleep(500); + + for (int i = delta; i < ITERATIONS + delta; i++) + stopGrid(i); + + if (exc.get() != null) { + log.info(thr.get().getName()); + + Throwable e = exc.get(); + log.error(e.getMessage(), e); + + log.info(dump.toString()); + + exc.set(null); + dump.set(null); + thr.set(null); + + fail("see all log"); + } + } + + /** + * + */ + public void testCacheStopping2() throws Exception { + + final int delta = 5; + + int itr = 20; + + for (int j = 0; j < itr; j++) { + + AtomicReference<Throwable> exc = new AtomicReference<>(); + + AtomicBoolean asyncRun = new AtomicBoolean(true); + + for (Thread worker : getExchangeWorkerThread()) { + worker.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread t, Throwable e) { + exc.set(e); + } + }); + } + + Thread thread = new Thread(new Runnable() { + @Override public void run() { + for (int start = 0; asyncRun.get(); start += delta) { + fillWithCache(ignite2, delta, start, affinityFunction()); + + for (String victim : ignite2.cacheNames()) + ignite2.getOrCreateCache(victim).put(start, delta); + + for (String victim : ignite1.cacheNames()) + ignite1.destroyCache(victim); + } + } + }); + thread.start(); + + for (int i = delta; i < ITERATIONS + delta; i++) + startGrid(i); + + U.sleep(500); + + for (int i = delta; i < ITERATIONS + delta; i++) + stopGrid(i); + + asyncRun.set(false); + + thread.join(); + + if (exc.get() != null) + fail(exc.get().getMessage()); + + } + } + + /** + * + */ + private Iterable<Thread> getExchangeWorkerThread() { + Collection<Thread> exhcWorkers = new ArrayList<>(); + for (Thread t : Thread.getAllStackTraces().keySet()) { + if (t.getName().contains("exchange-worker")) + exhcWorkers.add(t); + } + return exhcWorkers; + } + + /** Put 2 * {@code iterations} caches inside ignite. */ + private static void fillWithCache(Ignite ignite, int iterations, int start, AffinityFunction affinityFunction) { + for (int i = start; i < iterations + start; i++) { + CacheConfiguration<Integer, Integer> cachePCfg = new CacheConfiguration<>(); + + cachePCfg.setName(CACHE_NAME_DHT_PARTITIONED + i); + cachePCfg.setCacheMode(CacheMode.PARTITIONED); + cachePCfg.setBackups(1); + cachePCfg.setAffinity(affinityFunction); + + ignite.getOrCreateCache(cachePCfg); + + CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(); + + cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED + i); + cacheRCfg.setCacheMode(CacheMode.REPLICATED); + cachePCfg.setBackups(0); + cachePCfg.setAffinity(affinityFunction); + + ignite.getOrCreateCache(cacheRCfg); + } + } +}
