Repository: ignite Updated Branches: refs/heads/ignite-801 ead78a4fa -> 1d7d7786e
ignite-801: fixed issue in atomic reference and countdown latch in case of topology changes Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d7d7786 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d7d7786 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d7d7786 Branch: refs/heads/ignite-801 Commit: 1d7d7786ef40d8d416222af054203f3ab328a712 Parents: ead78a4 Author: Denis Magda <[email protected]> Authored: Fri Oct 2 15:09:03 2015 +0300 Committer: Denis Magda <[email protected]> Committed: Fri Oct 2 15:09:03 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheUtils.java | 19 +++++++-------- .../GridCacheAtomicReferenceImpl.java | 10 ++++---- .../GridCacheCountDownLatchImpl.java | 16 ++----------- .../GridTransactionalCacheQueueImpl.java | 2 +- ...eAbstractDataStructuresFailoverSelfTest.java | 25 +++++++++++++++++--- 5 files changed, 39 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 7854c93..9bf8938 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1774,19 +1774,16 @@ public class GridCacheUtils { return c.call(); } catch (IgniteCheckedException e) { - if (X.hasCause(e, ClusterTopologyCheckedException.class) || - X.hasCause(e, IgniteTxRollbackCheckedException.class) || - X.hasCause(e, CachePartialUpdateCheckedException.class)) { - if (i < retries - 1) { - err = e; - - U.sleep(1); + if (i == retries) + throw e; - continue; - } + if (X.hasCause(e, ClusterTopologyCheckedException.class)) { + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - throw e; + topErr.retryReadyFuture().get(); } + else if (X.hasCause(e, IgniteTxRollbackCheckedException.class)) + U.sleep(1); else throw e; } @@ -1797,4 +1794,4 @@ public class GridCacheUtils { } }; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index b25e111..2d8f7b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -40,6 +40,7 @@ import org.apache.ignite.lang.IgnitePredicate; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; /** * Cache atomic reference implementation. @@ -230,7 +231,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef * @return Callable for execution in async and sync mode. */ private Callable<Boolean> internalSet(final T val) { - return new Callable<Boolean>() { + return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); @@ -252,7 +253,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef throw e; } } - }; + }); } /** @@ -265,7 +266,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef */ private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred, final IgniteClosure<T, T> newValClos) { - return new Callable<Boolean>() { + + return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); @@ -295,7 +297,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef throw e; } } - }; + }); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 56be431..c984ab3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -342,21 +342,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc private class GetCountCallable implements Callable<Integer> { /** {@inheritDoc} */ @Override public Integer call() throws Exception { - Integer val; + GridCacheCountDownLatchValue latchVal = latchView.get(key); - //REMOVE TR - try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue latchVal = latchView.get(key); - - if (latchVal == null) - return 0; - - val = latchVal.get(); - - tx.rollback(); - } - - return val; + return latchVal == null ? 0 : latchVal.get(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index c7750a6..b14bb5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@ -244,4 +244,4 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> throw U.convertException(e); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1d7d7786/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 0b12d63..eb07bbc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -451,6 +451,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ private void doTestQueue(ConstantTopologyChangeWorker topWorker) throws Exception { + int queueMaxSize = 100; + try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) { s.put(1); @@ -464,15 +466,32 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig int val = s.peek(); - int origVal = val; + while (!fut.isDone()) { + if (s.size() == queueMaxSize) { + int last = 0; + + for (int i = 0, size = s.size() - 1; i < size; i++) { + int cur = s.poll(); + + if (i == 0) { + last = cur; + + continue; + } + + assertEquals(last, cur - 1); + + last = cur; + } + } - while (!fut.isDone()) s.put(++val); + } fut.get(); for (Ignite g : G.allGrids()) - assertEquals(origVal, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek()); + assertEquals(val, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek()); } }
