Repository: ignite Updated Branches: refs/heads/ignite-801 de632ace6 -> df42d4a94
ignite-801: properly processing cluster topology exception when atomic stamped value is being modified. Finished refactoring the tests. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/df42d4a9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/df42d4a9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/df42d4a9 Branch: refs/heads/ignite-801 Commit: df42d4a944178999594fd79fbe0ce45db4596442 Parents: de632ac Author: Denis Magda <[email protected]> Authored: Mon Sep 14 13:33:16 2015 +0300 Committer: Denis Magda <[email protected]> Committed: Mon Sep 14 13:33:16 2015 +0300 ---------------------------------------------------------------------- .../datastructures/DataStructuresProcessor.java | 23 +- .../GridCacheCountDownLatchImpl.java | 1 + ...eAbstractDataStructuresFailoverSelfTest.java | 754 +++++++------------ ...edOffheapDataStructuresFailoverSelfTest.java | 2 - ...eplicatedDataStructuresFailoverSelfTest.java | 5 - 5 files changed, 286 insertions(+), 499 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index a5561e9..ef66635 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -489,21 +489,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (dataStructure != null) return dataStructure; - if (!create) - return c.applyx(); - while (true) { - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); + try { + if (!create) + return c.applyx(); - if (err != null) - throw err; + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); - dataStructure = c.applyx(); + if (err != null) + throw err; - tx.commit(); + dataStructure = c.applyx(); + + tx.commit(); - return dataStructure; + return dataStructure; + } } catch (ClusterTopologyCheckedException e) { IgniteInternalFuture<?> fut = e.retryReadyFuture(); @@ -513,6 +515,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { catch (IgniteTxRollbackCheckedException ignore) { // Safe to retry right away. } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index cdd5f90..01d8c58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -344,6 +344,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc @Override public Integer call() throws Exception { Integer val; + //REMOVE TR try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheCountDownLatchValue latchVal = latchView.get(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 2fd40f6..0b12d63 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.datastructures; import java.util.Collection; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicReference; @@ -38,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; @@ -65,6 +68,15 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** */ private static final int TOP_CHANGE_THREAD_CNT = 3; + /** */ + private static final int TOP_CHANGED_ERR_RETRY_CNT = 5; + + /** */ + private static final long TOP_CHANGED_ERR_RETRY_TIMEOUT = 3000; + + /** */ + private static final long READY_FUTURE_WAIT_TIMEOUT = 10_000; + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return TEST_TIMEOUT; @@ -127,13 +139,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - assert g.atomicLong(STRUCTURE_NAME, 10, true).get() == 10; + assertEquals(10, g.atomicLong(STRUCTURE_NAME, 10, false).get()); - assert g.atomicLong(STRUCTURE_NAME, 10, true).addAndGet(10) == 20; + assertEquals(20, g.atomicLong(STRUCTURE_NAME, 10, false).addAndGet(10)); stopGrid(NEW_GRID_NAME); - assert grid(0).atomicLong(STRUCTURE_NAME, 10, true).get() == 20; + assertEquals(20, grid(0).atomicLong(STRUCTURE_NAME, 10, true).get()); } } @@ -141,97 +153,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicLongConstantTopologyChange() throws Exception { - try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - long val = s.get(); - - while (!fut.isDone()) { - assert s.get() == val; - - assert s.incrementAndGet() == val + 1; - - val++; - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get()); - } + doTestAtomicLong(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicLongConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); + doTestAtomicLong(new ConstantMultipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests IgniteAtomicLong. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicLong(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + assert ignite.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); long val = s.get(); while (!fut.isDone()) { - assert s.get() == val; + assertEquals(val, s.get()); - assert s.incrementAndGet() == val + 1; - - val++; + assertEquals(++val, s.incrementAndGet()); } fut.get(); for (Ignite g : G.allGrids()) - assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get()); + assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, false).get()); } } @@ -242,13 +201,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicReference atomic = grid(0).atomicReference(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - assert g.atomicReference(STRUCTURE_NAME, 10, true).get() == 10; + assertEquals((Integer)10, g.atomicReference(STRUCTURE_NAME, 10, false).get()); - g.atomicReference(STRUCTURE_NAME, 10, true).set(20); + g.atomicReference(STRUCTURE_NAME, 10, false).set(20); stopGrid(NEW_GRID_NAME); - assertEquals(20, (int) grid(0).atomicReference(STRUCTURE_NAME, 10, true).get()); + assertEquals((Integer)20, grid(0).atomicReference(STRUCTURE_NAME, 10, true).get()); } } @@ -256,85 +215,36 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicReferenceConstantTopologyChange() throws Exception { - try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.get(); - - while (!fut.isDone()) { - assert s.get() == val; - - s.set(++val); - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get()); - } + doTestAtomicReference(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); + doTestAtomicReference(new ConstantMultipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests atomic reference. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicReference(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + assert ignite.atomicReference(STRUCTURE_NAME, 1, false).get() > 0; - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.get(); while (!fut.isDone()) { - assert s.get() == val; + assertEquals(val, (int)s.get()); s.set(++val); } @@ -342,7 +252,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig fut.get(); for (Ignite g : G.allGrids()) - assert g.atomicReference(STRUCTURE_NAME, 1, true).get() == val; + assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get()); } } @@ -353,19 +263,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try (IgniteAtomicStamped atomic = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, true).get(); + IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, false).get(); - assert t.get1() == 10; - assert t.get2() == 10; + assertEquals((Integer)10, t.get1()); + assertEquals((Integer)10, t.get2()); - g.atomicStamped(STRUCTURE_NAME, 10, 10, true).set(20, 20); + g.atomicStamped(STRUCTURE_NAME, 10, 10, false).set(20, 20); stopGrid(NEW_GRID_NAME); - t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true).get(); + t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, false).get(); - assert t.get1() == 20; - assert t.get2() == 20; + assertEquals((Integer)20, t.get1()); + assertEquals((Integer)20, t.get2()); } } @@ -373,107 +283,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicStampedConstantTopologyChange() throws Exception { - try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override - public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - IgniteBiTuple<Integer, Integer> t = - g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); - - assert t.get1() > 0; - assert t.get2() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.value(); - - while (!fut.isDone()) { - IgniteBiTuple<Integer, Integer> t = s.get(); - - assert t.get1() == val; - assert t.get2() == val; - - val++; - - s.set(val, val); - } - - fut.get(); - - for (Ignite g : G.allGrids()) { - IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); - - assert t.get1() == val; - assert t.get2() == val; - } - } + doTestAtomicStamped(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testAtomicStampedConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); + doTestAtomicStamped(new ConstantMultipleTopologyChangeWorker()); + } - Ignite g = startGrid(name); + /** + * Tests atomic stamped value. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicStamped(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + IgniteBiTuple<Integer, Integer> t = ignite.atomicStamped(STRUCTURE_NAME, 1, 1, false).get(); - IgniteBiTuple<Integer, Integer> t = - g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); + assert t.get1() > 0; + assert t.get2() > 0; - assert t.get1() > 0; - assert t.get2() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.value(); while (!fut.isDone()) { IgniteBiTuple<Integer, Integer> t = s.get(); - assert t.get1() == val; - assert t.get2() == val; + assertEquals(val, (int)t.get1()); + assertEquals(val, (int)t.get2()); - val++; + ++val; s.set(val, val); } @@ -481,10 +328,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig fut.get(); for (Ignite g : G.allGrids()) { - IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get(); + IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, false).get(); - assert t.get1() == val; - assert t.get2() == val; + assertEquals(val, (int)t.get1()); + assertEquals(val, (int)t.get2()); } } } @@ -497,16 +344,16 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig try { Ignite g = startGrid(NEW_GRID_NAME); - assert g.countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 20; + assertEquals(20, g.countDownLatch(STRUCTURE_NAME, 20, true, false).count()); - g.countDownLatch(STRUCTURE_NAME, 20, true, true).countDown(10); + g.countDownLatch(STRUCTURE_NAME, 20, true, false).countDown(10); stopGrid(NEW_GRID_NAME); - assert grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 10; + assertEquals(10, grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).count()); } finally { - grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).countDownAll(); + grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).countDownAll(); } } } @@ -515,102 +362,45 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testCountDownLatchConstantTopologyChange() throws Exception { - try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { - try { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); - - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - int val = s.count(); - - while (!fut.isDone()) { - assert s.count() == val; - - assert s.countDown() == val - 1; - - val--; - } - - fut.get(); - - for (Ignite g : G.allGrids()) - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count() == val; - } - finally { - grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll(); - } - } + doTestCountDownLatch(new ConstantTopologyChangeWorker()); } /** * @throws Exception If failed. */ public void testCountDownLatchConstantMultipleTopologyChange() throws Exception { + doTestCountDownLatch(new ConstantMultipleTopologyChangeWorker()); + } + + /** + * Tests distributed count down latch. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestCountDownLatch(ConstantTopologyChangeWorker topWorker) throws Exception { try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { try { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); - - names.add(name); - - Ignite g = startGrid(name); + IgniteInternalFuture<?> fut = topWorker.startChangingTopology( + new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + assert ignite.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count() > 0; - assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); + return null; } - } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.count(); while (!fut.isDone()) { - assert s.count() == val; - - assert s.countDown() == val - 1; - - val--; + assertEquals(val, s.count()); + assertEquals(--val, s.countDown()); } fut.get(); for (Ignite g : G.allGrids()) - assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count()); + assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count()); } finally { grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll(); @@ -627,13 +417,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig Ignite g = startGrid(NEW_GRID_NAME); - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).poll() == 10; + assertEquals(10, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).poll()); g.queue(STRUCTURE_NAME, 0, null).put(20); stopGrid(NEW_GRID_NAME); - assert grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek() == 20; + assertEquals(20, (int)grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek()); } finally { grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).close(); @@ -644,31 +434,33 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testQueueConstantTopologyChange() throws Exception { + doTestQueue(new ConstantTopologyChangeWorker()); + } + + /** + * @throws Exception If failed. + */ + public void testQueueConstantMultipleTopologyChange() throws Exception { + doTestQueue(new ConstantMultipleTopologyChangeWorker()); + } + + /** + * Tests the queue. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestQueue(ConstantTopologyChangeWorker topWorker) throws Exception { try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) { s.put(1); - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - String name = UUID.randomUUID().toString(); - - try { - Ignite g = startGrid(name); + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + assert ignite.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0; - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0; - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); int val = s.peek(); @@ -680,71 +472,71 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig fut.get(); for (Ignite g : G.allGrids()) - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal; + assertEquals(origVal, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek()); } } /** * @throws Exception If failed. */ - public void testQueueConstantMultipleTopologyChange() throws Exception { - try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) { - s.put(1); + public void testAtomicSequenceInitialization() throws Exception { + int threadCnt = 3; - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); + final AtomicInteger idx = new AtomicInteger(gridCount()); - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + int id = idx.getAndIncrement(); - names.add(name); + try { + startGrid(id); - Ignite g = startGrid(name); + Thread.sleep(1000); - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0; - } - } - finally { - if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + catch (Exception e) { + throw F.wrap(e); + } + finally { + stopGrid(id); - int val = s.peek(); + info("Thread finished."); + } + } + }, threadCnt, "test-thread"); - int origVal = val; + while (!fut.isDone()) { + grid(0).compute().call(new IgniteCallable<Object>() { + /** */ + @IgniteInstanceResource + private Ignite g; - while (!fut.isDone()) - s.put(++val); + @Override public Object call() throws Exception { + IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true); - fut.get(); + assert seq != null; - for (Ignite g : G.allGrids()) - assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal; + for (int i = 0; i < 1000; i++) + seq.getAndIncrement(); + + return null; + } + }); } + + fut.get(); } /** * @throws Exception If failed. */ public void testAtomicSequenceTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid().atomicSequence(STRUCTURE_NAME, 10, true)) { + try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME); - assert g.atomicSequence(STRUCTURE_NAME, 10, false).get() == 1010; + assertEquals(1010, g.atomicSequence(STRUCTURE_NAME, 10, false).get()); - assert g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10) == 1020; + assertEquals(1020, g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10)); stopGrid(NEW_GRID_NAME); } @@ -754,29 +546,31 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicSequenceConstantTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - try { - String name = UUID.randomUUID().toString(); + doTestAtomicSequence(new ConstantTopologyChangeWorker()); + } - for (int i = 0; i < TOP_CHANGE_CNT; i++) { - try { - Ignite g = startGrid(name); + /** + * @throws Exception If failed. + */ + public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception { + doTestAtomicSequence(new ConstantMultipleTopologyChangeWorker()); + } - assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0); - } - finally { - if (i != TOP_CHANGE_CNT - 1) - stopGrid(name); - } - } - } - catch (Exception e) { - throw F.wrap(e); - } + /** + * Tests atomic sequence. + * + * @param topWorker Topology change worker. + * @throws Exception If failed. + */ + private void doTestAtomicSequence(ConstantTopologyChangeWorker topWorker) throws Exception { + try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + assertTrue(ignite.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0); + + return null; } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }); long old = s.get(); @@ -797,135 +591,131 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** * @throws Exception If failed. */ - public void testAtomicSequenceInitialization() throws Exception { - int threadCnt = 3; + public void testUncommitedTxLeave() throws Exception { + final int val = 10; - final AtomicInteger idx = new AtomicInteger(gridCount()); + grid(0).atomicLong(STRUCTURE_NAME, val, true); - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { - @Override public void apply() { - int id = idx.getAndIncrement(); + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Ignite g = startGrid(NEW_GRID_NAME); try { - startGrid(id); + g.transactions().txStart(); - Thread.sleep(1000); + g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1); - } - catch (Exception e) { - throw F.wrap(e); + assertEquals(val + 1, g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet()); } finally { - stopGrid(id); - - info("Thread finished."); + stopGrid(NEW_GRID_NAME); } - } - }, threadCnt, "test-thread"); - - while (!fut.isDone()) { - grid(0).compute().call(new IgniteCallable<Object>() { - /** */ - @IgniteInstanceResource - private Ignite g; - - @Override public Object call() throws Exception { - IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true); - assert seq != null; - - for (int i = 0; i < 1000; i++) - seq.getAndIncrement(); + return null; + } + }).get(); - return null; - } - }); - } + waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()])); - fut.get(); + assertEquals(val + 1, grid(0).atomicLong(STRUCTURE_NAME, val, false).get()); } /** - * @throws Exception If failed. + * */ - public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception { - try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) { + private class ConstantTopologyChangeWorker { + /** */ + protected final AtomicBoolean failed = new AtomicBoolean(false); + + /** + * Starts changing cluster's topology. + * + * @return Future. + */ + IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) { IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { - Collection<String> names = new GridLeanSet<>(3); - - try { - for (int j = 0; j < 3; j++) { - String name = UUID.randomUUID().toString(); + if (failed.get()) + return; - names.add(name); + String name = UUID.randomUUID().toString(); - Ignite g = startGrid(name); + try { + Ignite g = startGrid(name); - assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0); - } + callback.apply(g); } finally { if (i != TOP_CHANGE_CNT - 1) - for (String name : names) - stopGrid(name); + stopGrid(name); } } } catch (Exception e) { + failed.set(true); + throw F.wrap(e); } } }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - long old = s.get(); - - while (!fut.isDone()) { - assertEquals(old, s.get()); - - long val = s.incrementAndGet(); - - assertTrue(val > old); - - old = val; - } - - fut.get(); + return fut; } } /** - * @throws Exception If failed. + * */ - public void testUncommitedTxLeave() throws Exception { - final int val = 10; + private class ConstantMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker { + /** + * Starts changing cluster's topology. + * + * @return Future. + */ + @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + if (failed.get()) + return; - grid(0).atomicLong(STRUCTURE_NAME, val, true); + Collection<String> names = new GridLeanSet<>(3); - GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Ignite g = startGrid(NEW_GRID_NAME); + try { + for (int j = 0; j < 3; j++) { + if (failed.get()) + return; - try { - g.transactions().txStart(); + String name = UUID.randomUUID().toString(); + Ignite g = startGrid(name); - g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1); + names.add(name); - assert g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet() == val + 1; - } - finally { - stopGrid(NEW_GRID_NAME); - } + callback.apply(g); + } + } + finally { + if (i != TOP_CHANGE_CNT - 1) { - return null; - } - }).get(); + for (String name : names) + stopGrid(name); + } + } + } + } + catch (Exception e) { + failed.set(true); - waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()])); + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1; + return fut; + } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java index 86b763a..a9cd470 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java @@ -34,6 +34,4 @@ public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest extends G @Override protected CacheMemoryMode collectionMemoryMode() { return OFFHEAP_TIERED; } - - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java index 69de7cd..902ba44 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java @@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED; public class GridCacheReplicatedDataStructuresFailoverSelfTest extends GridCacheAbstractDataStructuresFailoverSelfTest { /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-801"); - } - - /** {@inheritDoc} */ @Override protected CacheMode collectionCacheMode() { return REPLICATED; }
