ignite-1.5 Fixed asserts in GridTransactionalCacheQueueImpl to take into account case when queue data was really lost.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/add61614 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/add61614 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/add61614 Branch: refs/heads/ignite-1537 Commit: add6161451d3e8b263782e52c51a0e2b34daeb3f Parents: 3016c0f Author: sboikov <[email protected]> Authored: Wed Dec 9 16:32:48 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 9 16:32:48 2015 +0300 ---------------------------------------------------------------------- .../GridTransactionalCacheQueueImpl.java | 32 +++-- ...eAbstractDataStructuresFailoverSelfTest.java | 138 +++++++++++++++---- 2 files changed, 128 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/add61614/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index 32e94d3..7b80765 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@ -96,22 +96,28 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> @Override public T call() throws Exception { T retVal; - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get(); + while (true) { + try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get(); - if (idx != null) { - checkRemoved(idx); + if (idx != null) { + checkRemoved(idx); - retVal = (T)cache.getAndRemove(itemKey(idx)); + retVal = (T)cache.getAndRemove(itemKey(idx)); - assert retVal != null : idx; - } - else - retVal = null; + if (retVal == null) { // Possible if data was lost. + tx.commit(); - tx.commit(); + continue; + } + } + else + retVal = null; - return retVal; + tx.commit(); + + return retVal; + } } } }).call(); @@ -188,9 +194,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> if (idx != null) { checkRemoved(idx); - boolean rmv = cache.remove(itemKey(idx)); - - assert rmv : idx; + cache.remove(itemKey(idx)); } tx.commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/add61614/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index cb16aeb..1e15c15 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -196,14 +196,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicLongConstantTopologyChange() throws Exception { - doTestAtomicLong(new ConstantTopologyChangeWorker()); + doTestAtomicLong(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** * @throws Exception If failed. */ public void testAtomicLongConstantMultipleTopologyChange() throws Exception { - doTestAtomicLong(multipleTopologyChangeWorker()); + doTestAtomicLong(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** @@ -258,14 +258,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicReferenceConstantTopologyChange() throws Exception { - doTestAtomicReference(new ConstantTopologyChangeWorker()); + doTestAtomicReference(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** * @throws Exception If failed. */ public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception { - doTestAtomicReference(multipleTopologyChangeWorker()); + doTestAtomicReference(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** @@ -326,14 +326,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicStampedConstantTopologyChange() throws Exception { - doTestAtomicStamped(new ConstantTopologyChangeWorker()); + doTestAtomicStamped(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** * @throws Exception If failed. */ public void testAtomicStampedConstantMultipleTopologyChange() throws Exception { - doTestAtomicStamped(multipleTopologyChangeWorker()); + doTestAtomicStamped(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** @@ -687,14 +687,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testCountDownLatchConstantTopologyChange() throws Exception { - doTestCountDownLatch(new ConstantTopologyChangeWorker()); + doTestCountDownLatch(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** * @throws Exception If failed. */ public void testCountDownLatchConstantMultipleTopologyChange() throws Exception { - doTestCountDownLatch(multipleTopologyChangeWorker()); + doTestCountDownLatch(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** @@ -758,15 +758,73 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** * @throws Exception If failed. */ + public void testQueueTopologyChange() throws Exception { + ConstantTopologyChangeWorker topWorker = new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT); + + try (final IgniteQueue<Integer> q = grid(0).queue(STRUCTURE_NAME, 0, config(false))) { + for (int i = 0; i < 1000; i++) + q.add(i); + + final IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + return null; + } + }); + + IgniteInternalFuture<?> takeFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!fut.isDone()) + q.take(); + + return null; + } + }); + + IgniteInternalFuture<?> pollFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!fut.isDone()) + q.poll(); + + return null; + } + }); + + IgniteInternalFuture<?> addFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!fut.isDone()) + q.add(0); + + return null; + } + }); + + fut.get(); + + pollFut.get(); + addFut.get(); + + q.add(0); + + takeFut.get(); + } + } + + /** + * @throws Exception If failed. + */ public void testQueueConstantTopologyChange() throws Exception { - doTestQueue(new ConstantTopologyChangeWorker()); + int topChangeThreads = collectionCacheMode() == CacheMode.PARTITIONED ? 1 : TOP_CHANGE_THREAD_CNT; + + doTestQueue(new ConstantTopologyChangeWorker(topChangeThreads)); } /** * @throws Exception If failed. */ public void testQueueConstantMultipleTopologyChange() throws Exception { - doTestQueue(multipleTopologyChangeWorker()); + int topChangeThreads = collectionCacheMode() == CacheMode.PARTITIONED ? 1 : TOP_CHANGE_THREAD_CNT; + + doTestQueue(multipleTopologyChangeWorker(topChangeThreads)); } /** @@ -902,14 +960,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testAtomicSequenceConstantTopologyChange() throws Exception { - doTestAtomicSequence(new ConstantTopologyChangeWorker()); + doTestAtomicSequence(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** * @throws Exception If failed. */ public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception { - doTestAtomicSequence(multipleTopologyChangeWorker()); + doTestAtomicSequence(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } /** @@ -977,11 +1035,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig } /** + * @param topChangeThreads Number of topology change threads. + * * @return Specific multiple topology change worker implementation. */ - private ConstantTopologyChangeWorker multipleTopologyChangeWorker() { - return collectionCacheMode() == CacheMode.PARTITIONED ? new PartitionedMultipleTopologyChangeWorker() : - new MultipleTopologyChangeWorker(); + private ConstantTopologyChangeWorker multipleTopologyChangeWorker(int topChangeThreads) { + return collectionCacheMode() == CacheMode.PARTITIONED ? + new PartitionedMultipleTopologyChangeWorker(topChangeThreads) : + new MultipleTopologyChangeWorker(topChangeThreads); } /** @@ -991,13 +1052,24 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** */ protected final AtomicBoolean failed = new AtomicBoolean(false); + /** */ + private final int topChangeThreads; + + /** + * @param topChangeThreads Number of topology change threads. + */ + public ConstantTopologyChangeWorker(int topChangeThreads) { + this.topChangeThreads = topChangeThreads; + } + /** * Starts changing cluster's topology. * + * @param cb Callback to run after node start. * @return Future. */ - IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) { + return GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { @@ -1011,7 +1083,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig Ignite g = startGrid(name); - callback.apply(g); + cb.apply(g); } finally { if (i != TOP_CHANGE_CNT - 1) @@ -1024,9 +1096,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig throw F.wrap(e); } } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - return fut; + }, topChangeThreads, "topology-change-thread"); } } @@ -1035,12 +1105,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig */ private class MultipleTopologyChangeWorker extends ConstantTopologyChangeWorker { /** + * @param topChangeThreads Number of topology change threads. + */ + public MultipleTopologyChangeWorker(int topChangeThreads) { + super(topChangeThreads); + } + + /** * Starts changing cluster's topology. * * @return Future. */ - @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) { - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) { + return GridTestUtils.runMultiThreadedAsync(new CA() { @Override public void apply() { try { for (int i = 0; i < TOP_CHANGE_CNT; i++) { @@ -1062,7 +1139,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig names.add(name); - callback.apply(g); + cb.apply(g); } } finally { @@ -1079,8 +1156,6 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig } } }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); - - return fut; } } @@ -1092,11 +1167,18 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig private CyclicBarrier barrier; /** + * @param topChangeThreads Number of topology change threads. + */ + public PartitionedMultipleTopologyChangeWorker(int topChangeThreads) { + super(topChangeThreads); + } + + /** * Starts changing cluster's topology. * * @return Future. */ - @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) { + @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) { final Semaphore sem = new Semaphore(TOP_CHANGE_THREAD_CNT); final ConcurrentSkipListSet<String> startedNodes = new ConcurrentSkipListSet<>(); @@ -1151,7 +1233,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig Ignite g = startGrid(name); - callback.apply(g); + cb.apply(g); } try {
