http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 85a26ad..bc11448 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -18,8 +18,15 @@ package org.apache.ignite.internal.processors.cache.datastructures; import java.util.Collection; +import java.util.Timer; +import java.util.TimerTask; import java.util.UUID; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; @@ -27,20 +34,27 @@ import org.apache.ignite.IgniteAtomicReference; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteQueue; import org.apache.ignite.IgniteSemaphore; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.typedef.CA; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -50,7 +64,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; */ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends IgniteCollectionAbstractTest { /** */ - private static final long TEST_TIMEOUT = 2 * 60 * 1000; + private static final long TEST_TIMEOUT = 3 * 60 * 1000; /** */ private static final String NEW_GRID_NAME = "newGrid"; @@ -67,6 +81,9 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** */ private static final int TOP_CHANGE_THREAD_CNT = 3; + /** */ + private boolean client; + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return TEST_TIMEOUT; @@ -119,121 +136,106 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig cfg.setCacheConfiguration(ccfg); + if (client) { + cfg.setClientMode(client); + ((TcpDiscoverySpi)(cfg.getDiscoverySpi())).setForceServerMode(true); + } + return cfg; } /** * @throws Exception If failed. */ - public void testAtomicLongTopologyChange() throws Exception { - try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) { - Ignite g = startGrid(NEW_GRID_NAME); + public void testAtomicLongFailsWhenServersLeft() throws Exception { + client = true; + + Ignite ignite = startGrid(gridCount()); - assert g.atomicLong(STRUCTURE_NAME, 10, true).get() == 10; + new Timer().schedule(new TimerTask() { + @Override public void run() { + for (int i = 0; i < gridCount(); i++) + stopGrid(i); + } + }, 10_000); - assert g.atomicLong(STRUCTURE_NAME, 10, true).addAndGet(10) == 20; + long stopTime = U.currentTimeMillis() + TEST_TIMEOUT / 2; - stopGrid(NEW_GRID_NAME); + IgniteAtomicLong atomic = ignite.atomicLong(STRUCTURE_NAME, 10, true); + + try { + while (U.currentTimeMillis() < stopTime) + assertEquals(10, atomic.get()); + } + catch (IgniteException e) { + if (X.hasCause(e, ClusterTopologyServerNotFoundException.class)) + return; - assert grid(0).atomicLong(STRUCTURE_NAME, 10, true).get() == 20; + throw e; } + + fail(); } /** * @throws Exception If failed. */ - public void testAtomicLongConstantTopologyChange() throws Exception { - try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - long val = s.get(); - - while (!fut.isDone()) { - assert s.get() == val; + public void testAtomicLongTopologyChange() throws Exception { + try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) { + Ignite g = startGrid(NEW_GRID_NAME); - assert s.incrementAndGet() == val + 1; + assertEquals(10, g.atomicLong(STRUCTURE_NAME, 10, false).get()); - val++; - } + assertEquals(20, g.atomicLong(STRUCTURE_NAME, 10, false).addAndGet(10)); - fut.get(); + stopGrid(NEW_GRID_NAME); - for (Ignite g : G.allGrids()) - assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get()); + assertEquals(20, grid(0).atomicLong(STRUCTURE_NAME, 10, true).get()); } } /** * @throws Exception If failed. */ - public void testAtomicLongConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + public void testAtomicLongConstantTopologyChange() throws Exception { + doTestAtomicLong(new ConstantTopologyChangeWorker()); + } - names.add(name); + /** + * @throws Exception If failed. + */ + public void testAtomicLongConstantMultipleTopologyChange() throws Exception { + doTestAtomicLong(multipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests IgniteAtomicLong. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicLong(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + assert ignite.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); long val = s.get(); while (!fut.isDone()) { - assert s.get() == val; - - assert s.incrementAndGet() == val + 1; + assertEquals(val, s.get()); - val++; + assertEquals(++val, s.incrementAndGet()); } fut.get(); for (Ignite g : G.allGrids()) - assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get()); + assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, false).get()); } } @@ -244,13 +246,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicReference atomic = grid(0).atomicReference(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - assert g.atomicReference(STRUCTURE_NAME, 10, true).get() == 10; + assertEquals((Integer)10, g.atomicReference(STRUCTURE_NAME, 10, false).get()); - g.atomicReference(STRUCTURE_NAME, 10, true).set(20); + g.atomicReference(STRUCTURE_NAME, 10, false).set(20); stopGrid(NEW_GRID_NAME); - assertEquals(20, (int) grid(0).atomicReference(STRUCTURE_NAME, 10, true).get()); + assertEquals((Integer)20, grid(0).atomicReference(STRUCTURE_NAME, 10, true).get()); } } @@ -258,85 +260,36 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicReferenceConstantTopologyChange() throws Exception { - try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.get(); - - while (!fut.isDone()) { - assert s.get() == val; - - s.set(++val); - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get()); - } + doTestAtomicReference(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); + doTestAtomicReference(multipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests atomic reference. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicReference(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + assert ignite.atomicReference(STRUCTURE_NAME, 1, false).get() > 0; - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.get(); while (!fut.isDone()) { - assert s.get() == val; + assertEquals(val, (int)s.get()); s.set(++val); } @@ -344,7 +297,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig fut.get(); for (Ignite g : G.allGrids()) - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() == val; + assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get()); } } @@ -355,19 +308,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicStamped atomic = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, true).get(); + IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, false).get(); - assert t.get1() == 10; - assert t.get2() == 10; + assertEquals((Integer)10, t.get1()); + assertEquals((Integer)10, t.get2()); - g.atomicStamped(STRUCTURE_NAME, 10, 10, true).set(20, 20); + g.atomicStamped(STRUCTURE_NAME, 10, 10, false).set(20, 20); stopGrid(NEW_GRID_NAME); - t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true).get(); + t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, false).get(); - assert t.get1() == 20; - assert t.get2() == 20; + assertEquals((Integer)20, t.get1()); + assertEquals((Integer)20, t.get2()); } } @@ -375,107 +328,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicStampedConstantTopologyChange() throws Exception { - try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - IgniteBiTuple<Integer, Integer> t = - g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); - - assert t.get1() > 0; - assert t.get2() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.value(); - - while (!fut.isDone()) { - IgniteBiTuple<Integer, Integer> t = s.get(); - - assert t.get1() == val; - assert t.get2() == val; - - val++; - - s.set(val, val); - } - - fut.get(); - - for (Ignite g : G.allGrids()) { - IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); - - assert t.get1() == val; - assert t.get2() == val; - } - } + doTestAtomicStamped(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicStampedConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); + doTestAtomicStamped(multipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests atomic stamped value. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicStamped(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + IgniteBiTuple<Integer, Integer> t = ignite.atomicStamped(STRUCTURE_NAME, 1, 1, false).get(); - IgniteBiTuple<Integer, Integer> t = - g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); + assert t.get1() > 0; + assert t.get2() > 0; - assert t.get1() > 0; - assert t.get2() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.value(); while (!fut.isDone()) { IgniteBiTuple<Integer, Integer> t = s.get(); - assert t.get1() == val; - assert t.get2() == val; + assertEquals(val, (int)t.get1()); + assertEquals(val, (int)t.get2()); - val++; + ++val; s.set(val, val); } @@ -483,10 +373,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig fut.get(); for (Ignite g : G.allGrids()) { - IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); + IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, false).get(); - assert t.get1() == val; - assert t.get2() == val; + assertEquals(val, (int)t.get1()); + assertEquals(val, (int)t.get2()); } } } @@ -499,16 +389,16 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try { Ignite g = startGrid(NEW_GRID_NAME); - assert g.countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 20; + assertEquals(20, g.countDownLatch(STRUCTURE_NAME, 20, true, false).count()); - g.countDownLatch(STRUCTURE_NAME, 20, true, true).countDown(10); + g.countDownLatch(STRUCTURE_NAME, 20, true, false).countDown(10); stopGrid(NEW_GRID_NAME); - assert grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 10; + assertEquals(10, grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).count()); } finally { - grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).countDownAll(); + grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).countDownAll(); } } } @@ -517,6 +407,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testSemaphoreTopologyChange() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-1977"); try (IgniteSemaphore semaphore = grid(0).semaphore(STRUCTURE_NAME, 20, true, true)) { try { @@ -541,6 +432,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testSemaphoreConstantTopologyChange() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-1977"); + try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 10, false, true)) { try { IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @@ -595,6 +488,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testSemaphoreConstantTopologyChangeFailoverSafe() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-1977"); + try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true)) { try { IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @@ -656,6 +551,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testSemaphoreConstantMultipleTopologyChangeFailoverSafe() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-1977"); + final int numPermits = 3; try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, numPermits, true, true)) { @@ -728,6 +625,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testSemaphoreConstantTopologyChangeNotFailoverSafe() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-1977"); + try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 1, false, true)) { try { IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @@ -788,105 +687,48 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testCountDownLatchConstantTopologyChange() throws Exception { + doTestCountDownLatch(new ConstantTopologyChangeWorker()); + } + + /** + * @throws Exception If failed. + */ + public void testCountDownLatchConstantMultipleTopologyChange() throws Exception { + doTestCountDownLatch(multipleTopologyChangeWorker()); + } + + /** + * Tests distributed count down latch. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestCountDownLatch(ConstantTopologyChangeWorker topWorker) throws Exception { try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { try { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); + IgniteInternalFuture<?> fut = topWorker.startChangingTopology( + new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + assert ignite.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count() > 0; - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } + return null; } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.count(); while (!fut.isDone()) { - assert s.count() == val; - - assert s.countDown() == val - 1; - - val--; + assertEquals(val, s.count()); + assertEquals(--val, s.countDown()); } fut.get(); for (Ignite g : G.allGrids()) - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count() == val; + assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count()); } finally { - grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll(); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testCountDownLatchConstantMultipleTopologyChange() throws Exception { - try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { - try { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); - - Ignite g = startGrid(name); - - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.count(); - - while (!fut.isDone()) { - assert s.count() == val; - - assert s.countDown() == val - 1; - - val--; - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count()); - } - finally { - grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll(); + grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll(); } } } @@ -900,13 +742,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig Ignite g = startGrid(NEW_GRID_NAME); - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).poll() == 10; + assertEquals(10, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).poll()); g.queue(STRUCTURE_NAME, 0, null).put(20); stopGrid(NEW_GRID_NAME); - assert grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek() == 20; + assertEquals(20, (int)grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek()); } finally { grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).close(); @@ -917,107 +759,138 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testQueueConstantTopologyChange() throws Exception { + doTestQueue(new ConstantTopologyChangeWorker()); + } + + /** + * @throws Exception If failed. + */ + public void testQueueConstantMultipleTopologyChange() throws Exception { + doTestQueue(multipleTopologyChangeWorker()); + } + + /** + * Tests the queue. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestQueue(ConstantTopologyChangeWorker topWorker) throws Exception { + int queueMaxSize = 100; + try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) { s.put(1); - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + IgniteQueue<Integer> queue = ignite.queue(STRUCTURE_NAME, 0, null); - try { - Ignite g = startGrid(name); + assertNotNull(queue); - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + Integer val = queue.peek(); + + assertNotNull(val); + + assert val > 0; + + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.peek(); - int origVal = val; + while (!fut.isDone()) { + if (s.size() == queueMaxSize) { + int last = 0; + + for (int i = 0, size = s.size() - 1; i < size; i++) { + int cur = s.poll(); + + if (i == 0) { + last = cur; + + continue; + } + + assertEquals(last, cur - 1); + + last = cur; + } + } - while (!fut.isDone()) s.put(++val); + } fut.get(); + val = s.peek(); + for (Ignite g : G.allGrids()) - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal; + assertEquals(val, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek()); } } /** * @throws Exception If failed. */ - public void testQueueConstantMultipleTopologyChange() throws Exception { - try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) { - s.put(1); + public void testAtomicSequenceInitialization() throws Exception { + int threadCnt = 3; - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); + final AtomicInteger idx = new AtomicInteger(gridCount()); - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + int id = idx.getAndIncrement(); - names.add(name); + try { + startGrid(id); - Ignite g = startGrid(name); + Thread.sleep(1000); - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + catch (Exception e) { + throw F.wrap(e); + } + finally { + stopGrid(id); - int val = s.peek(); + info("Thread finished."); + } + } + }, threadCnt, "test-thread"); - int origVal = val; + while (!fut.isDone()) { + grid(0).compute().call(new IgniteCallable<Object>() { + /** */ + @IgniteInstanceResource + private Ignite g; - while (!fut.isDone()) - s.put(++val); + @Override public Object call() throws Exception { + IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true); - fut.get(); + assert seq != null; - for (Ignite g : G.allGrids()) - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal; + for (int i = 0; i < 1000; i++) + seq.getAndIncrement(); + + return null; + } + }); } + + fut.get(); } /** * @throws Exception If failed. */ public void testAtomicSequenceTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid().atomicSequence(STRUCTURE_NAME, 10, true)) { + try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - assert g.atomicSequence(STRUCTURE_NAME, 10, false).get() == 1010; + assertEquals(1010, g.atomicSequence(STRUCTURE_NAME, 10, false).get()); - assert g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10) == 1020; + assertEquals(1020, g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10)); stopGrid(NEW_GRID_NAME); } @@ -1027,29 +900,31 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicSequenceConstantTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - String name = UUID.randomUUID().toString(); + doTestAtomicSequence(new ConstantTopologyChangeWorker()); + } - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - try { - Ignite g = startGrid(name); + /** + * @throws Exception If failed. + */ + public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception { + doTestAtomicSequence(multipleTopologyChangeWorker()); + } - assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0); - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + /** + * Tests atomic sequence. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicSequence(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + assertTrue(ignite.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0); + + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); long old = s.get(); @@ -1070,135 +945,228 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** * @throws Exception If failed. */ - public void testAtomicSequenceInitialization() throws Exception { - int threadCnt = 3; + public void testUncommitedTxLeave() throws Exception { + final int val = 10; - final AtomicInteger idx = new AtomicInteger(gridCount()); + grid(0).atomicLong(STRUCTURE_NAME, val, true); - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - int id = idx.getAndIncrement(); + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Ignite g = startGrid(NEW_GRID_NAME); try { - startGrid(id); + g.transactions().txStart(); - Thread.sleep(1000); + g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1); - } - catch (Exception e) { - throw F.wrap(e); + assertEquals(val + 1, g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet()); } finally { - stopGrid(id); - - info("Thread finished."); + stopGrid(NEW_GRID_NAME); } + + return null; } - }, threadCnt, "test-thread"); + }).get(); - while (!fut.isDone()) { - grid(0).compute().call(new IgniteCallable<Object>() { - /** */ - @IgniteInstanceResource - private Ignite g; + waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()])); - @Override public Object call() throws Exception { - IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true); + assertEquals(val + 1, grid(0).atomicLong(STRUCTURE_NAME, val, false).get()); + } - assert seq != null; + /** + * @return Specific multiple topology change worker implementation. + */ + private ConstantTopologyChangeWorker multipleTopologyChangeWorker() { + return collectionCacheMode() == CacheMode.PARTITIONED ? new PartitionedMultipleTopologyChangeWorker() : + new MultipleTopologyChangeWorker(); + } - for (int i = 0; i < 1000; i++) - seq.getAndIncrement(); + /** + * + */ + private class ConstantTopologyChangeWorker { + /** */ + protected final AtomicBoolean failed = new AtomicBoolean(false); + + /** + * Starts changing cluster's topology. + * + * @return Future. + */ + IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + if (failed.get()) + return; - return null; + String name = UUID.randomUUID().toString(); + + try { + Ignite g = startGrid(name); + + callback.apply(g); + } + finally { + if (i != TOP_CHANGE_CNT - 1) + stopGrid(name); + } + } } - }); - } + catch (Exception e) { + if (failed.compareAndSet(false, true)) + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - fut.get(); + return fut; + } } /** - * @throws Exception If failed. + * */ - public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { + private class MultipleTopologyChangeWorker extends ConstantTopologyChangeWorker { + /** + * Starts changing cluster's topology. + * + * @return Future. + */ + @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) { IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { + if (failed.get()) + return; + Collection<String> names = new GridLeanSet<>(3); try { for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + if (failed.get()) + return; - names.add(name); + String name = UUID.randomUUID().toString(); Ignite g = startGrid(name); - assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0); + names.add(name); + + callback.apply(g); } } finally { - if (i != TOP_CHANGE_CNT - 1) + if (i != TOP_CHANGE_CNT - 1) { for (String name : names) stopGrid(name); + } } } } catch (Exception e) { - throw F.wrap(e); + if (failed.compareAndSet(false, true)) + throw F.wrap(e); } } }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - long old = s.get(); - - while (!fut.isDone()) { - assertEquals(old, s.get()); - - long val = s.incrementAndGet(); - - assertTrue(val > old); - - old = val; - } - - fut.get(); + return fut; } } /** - * @throws Exception If failed. + * */ - public void testUncommitedTxLeave() throws Exception { - final int val = 10; + private class PartitionedMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker { + /** */ + private CyclicBarrier barrier; + + /** + * Starts changing cluster's topology. + * + * @return Future. + */ + @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) { + final Semaphore sem = new Semaphore(TOP_CHANGE_THREAD_CNT); + + final ConcurrentSkipListSet<String> startedNodes = new ConcurrentSkipListSet<>(); + + barrier = new CyclicBarrier(TOP_CHANGE_THREAD_CNT, new Runnable() { + @Override public void run() { + try { + assertEquals(TOP_CHANGE_THREAD_CNT * 3, startedNodes.size()); - grid(0).atomicLong(STRUCTURE_NAME, val, true); + for (String name : startedNodes) { + stopGrid(name, false); - GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Ignite g = startGrid(NEW_GRID_NAME); + awaitPartitionMapExchange(); + } - try { - g.transactions().txStart(); + startedNodes.clear(); + sem.release(TOP_CHANGE_THREAD_CNT); - g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1); + barrier.reset(); + } + catch (Exception e) { + if (failed.compareAndSet(false, true)) { + sem.release(TOP_CHANGE_THREAD_CNT); - assert g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet() == val + 1; - } - finally { - stopGrid(NEW_GRID_NAME); + barrier.reset(); + + throw F.wrap(e); + } + } } + }); - return null; - } - }).get(); + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + sem.acquire(); - waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()])); + if (failed.get()) + return; + + for (int j = 0; j < 3; j++) { + if (failed.get()) + return; + + String name = UUID.randomUUID().toString(); + + startedNodes.add(name); - assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1; + Ignite g = startGrid(name); + + callback.apply(g); + } + + try { + barrier.await(); + } + catch (BrokenBarrierException e) { + // Ignore. + } + } + } + catch (Exception e) { + if (failed.compareAndSet(false, true)) { + sem.release(TOP_CHANGE_THREAD_CNT); + + barrier.reset(); + + throw F.wrap(e); + } + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + + return fut; + } } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java index 18b0b21..6c880a5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java @@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED; public class GridCachePartitionedDataStructuresFailoverSelfTest extends GridCacheAbstractDataStructuresFailoverSelfTest { /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-803"); - } - - /** {@inheritDoc} */ @Override protected CacheMode collectionCacheMode() { return PARTITIONED; } @@ -50,4 +45,4 @@ public class GridCachePartitionedDataStructuresFailoverSelfTest @Override protected CacheAtomicityMode collectionCacheAtomicityMode() { return TRANSACTIONAL; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java index 86b763a..b3ded7f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java @@ -24,16 +24,10 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; /** * Failover tests for cache data structures. */ -public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest extends GridCachePartitionedDataStructuresFailoverSelfTest { - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-803"); - } - +public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest + extends GridCachePartitionedDataStructuresFailoverSelfTest { /** {@inheritDoc} */ @Override protected CacheMemoryMode collectionMemoryMode() { return OFFHEAP_TIERED; } - - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java index d0131d6..28ce901 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java @@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED; public class GridCacheReplicatedDataStructuresFailoverSelfTest extends GridCacheAbstractDataStructuresFailoverSelfTest { /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-801"); - } - - /** {@inheritDoc} */ @Override protected CacheMode collectionCacheMode() { return REPLICATED; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java index 19daa26..c00557d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java @@ -150,6 +150,8 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract * @throws Exception If failed. */ public void testClientQueueCreateCloseFailover() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-1976"); + testFailoverWithClient(new IgniteInClosure<Ignite>() { @Override public void apply(Ignite ignite) { for (int i = 0; i < 100; i++) { http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index caca2ca..94dc665 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -180,7 +180,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> * @return Timeout. */ protected long awaitForSocketWriteTimeout() { - return 5000; + return 8000; } /** @@ -742,4 +742,4 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> nodes.clear(); spiRsrcs.clear(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java index 344efc0..6b20b2a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java @@ -273,7 +273,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov } try { - assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS)); + assertTrue(latch.await(failureThreshold + 3000, TimeUnit.MILLISECONDS)); assertFalse("Unexpected event, see log for details.", err.get()); assertEquals(nodeId, client.cluster().localNode().id()); @@ -331,4 +331,4 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov err = null; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 379a3a6..42960e7 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -373,6 +373,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private void testFailureDetectionOnNodePing(Ignite pingingNode, Ignite failedNode) throws Exception { final CountDownLatch cnt = new CountDownLatch(1); + final UUID failedNodeId = failedNode.cluster().localNode().id(); + pingingNode.events().localListen( new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { @@ -390,9 +392,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { TcpDiscoverySpi spi = discoMap.get(pingingNode.name()); - boolean res = spi.pingNode(failedNode.cluster().localNode().id()); + boolean res = spi.pingNode(failedNodeId); - assertFalse("Ping is ok for node " + failedNode.cluster().localNode().id() + ", but had to fail.", res); + assertFalse("Ping is ok for node " + failedNodeId + ", but had to fail.", res); // Heartbeat interval is 40 seconds, but we should detect node failure faster. assert cnt.await(7, SECONDS); @@ -409,6 +411,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true; + final UUID failedNodeId = failedNode.cluster().localNode().id(); + final CountDownLatch pingLatch = new CountDownLatch(1); final CountDownLatch eventLatch = new CountDownLatch(1); @@ -422,7 +426,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { pingingNode.events().localListen( new IgnitePredicate<Event>() { @Override public boolean apply(Event event) { - if (((DiscoveryEvent)event).eventNode().id().equals(failedNode.cluster().localNode().id())) { + if (((DiscoveryEvent)event).eventNode().id().equals(failedNodeId)) { failRes.set(true); eventLatch.countDown(); } @@ -438,7 +442,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { pingLatch.countDown(); pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode( - failedNode.cluster().localNode().id())); + failedNodeId)); return null; } @@ -1166,7 +1170,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { for (IgniteKernal grid : grids) assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); - grids.add((IgniteKernal) startGrid(5)); + grids.add((IgniteKernal)startGrid(5)); for (IgniteKernal grid : grids) assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); @@ -1326,6 +1330,61 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed + */ + public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception { + try { + TestMessageWorkerFailureSpi spi0 = new TestMessageWorkerFailureSpi(); + + nodeSpi.set(spi0); + + final Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TcpDiscoverySpi()); + + Ignite ignite1 = startGrid(1); + + final AtomicBoolean disconnected = new AtomicBoolean(); + + final CountDownLatch latch = new CountDownLatch(1); + + final UUID failedNodeId = ignite0.cluster().localNode().id(); + + ignite1.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + if (event.type() == EventType.EVT_NODE_FAILED && + failedNodeId.equals(((DiscoveryEvent)event).eventNode().id())) + disconnected.set(true); + + latch.countDown(); + + return false; + } + }, EventType.EVT_NODE_FAILED); + + spi0.stop = true; + + latch.await(15, TimeUnit.SECONDS); + + assertTrue(disconnected.get()); + + try { + ignite0.cluster().localNode().id(); + } + catch (IllegalStateException e) { + if (e.getMessage().contains("Grid is in invalid state to perform this operation")) + return; + } + + fail(); + } + finally { + stopAllGrids(); + } + } + + + /** * @param twoNodes If {@code true} starts two nodes, otherwise three. * @throws Exception If failed */ @@ -1891,6 +1950,25 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * + */ + private static class TestMessageWorkerFailureSpi extends TcpDiscoverySpi { + /** */ + private volatile boolean stop; + + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + + if (stop) + throw new RuntimeException("Failing ring message worker explicitly"); + + super.writeToSocket(sock, msg, bout, timeout); + } + } + + /** * Starts new grid with given index. Method optimize is not invoked. * * @param idx Index of the grid to start. @@ -1911,4 +1989,4 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private Ignite startGridNoOptimize(String gridName) throws Exception { return G.start(getConfiguration(gridName)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 6f9c559..1fd4cb1 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -856,7 +856,7 @@ public abstract class GridAbstractTest extends TestCase { List<Ignite> ignites = G.allGrids(); for (Ignite g : ignites) { - if (g.cluster().localNode().isClient()) + if (g.configuration().getDiscoverySpi().isClientMode()) stopGrid(g.name(), cancel); } } @@ -868,7 +868,7 @@ public abstract class GridAbstractTest extends TestCase { List<Ignite> ignites = G.allGrids(); for (Ignite g : ignites) { - if (!g.cluster().localNode().isClient()) + if (!g.configuration().getDiscoverySpi().isClientMode()) stopGrid(g.name(), cancel); } } @@ -2065,4 +2065,4 @@ public abstract class GridAbstractTest extends TestCase { */ public abstract void run(Ignite ignite, IgniteCache<K, V> cache) throws Exception; } -} \ No newline at end of file +}
