Repository: ignite Updated Branches: refs/heads/ignite-1534-1 a586686c4 -> 0f083fd18
ignite-1534 Fixed races in dynamic cache start Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f083fd1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f083fd1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f083fd1 Branch: refs/heads/ignite-1534-1 Commit: 0f083fd183cfdebde0f9c91244ee434180cd0ccf Parents: a586686 Author: sboikov <[email protected]> Authored: Thu Oct 1 13:48:19 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Oct 1 16:31:00 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 2 +- .../processors/cache/GridCacheMvccManager.java | 20 ++- .../cache/distributed/dht/GridDhtGetFuture.java | 4 +- .../dht/GridPartitionedGetFuture.java | 5 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 9 +- .../distributed/near/GridNearGetFuture.java | 2 + .../CacheGetFutureHangsSelfTest.java | 156 +++++++++---------- .../testsuites/IgniteCacheTestSuite4.java | 3 + 8 files changed, 108 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 5385dec..3a1cee6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1848,7 +1848,7 @@ public class GridCacheContext<K, V> implements Externalizable { boolean deserializePortable, boolean cpy) { assert key != null; - assert val != null; + assert val != null || skipVals; if (!keepCacheObjects) { Object key0 = key.value(cacheObjCtx, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index dd51da2..0960c9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -391,13 +391,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * @param futVer Future ID. * @param fut Future. + * @return {@code False} if future was forcibly completed with error. */ - public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) { + public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) { IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut); assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; - onFutureAdded(fut); + return onFutureAdded(fut); } /** @@ -529,12 +530,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * @param fut Future. + * @return {@code False} if future was forcibly completed with error. */ - private void onFutureAdded(IgniteInternalFuture<?> fut) { - if (stopping) + private boolean onFutureAdded(IgniteInternalFuture<?> fut) { + if (stopping) { ((GridFutureAdapter)fut).onDone(stopError()); - else if (cctx.kernalContext().clientDisconnected()) + + return false; + } + else if (cctx.kernalContext().clientDisconnected()) { ((GridFutureAdapter)fut).onDone(disconnectedError(null)); + + return false; + } + + return true; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 76aaf72..a67b1de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -447,8 +447,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (v == null) it.remove(); - else if (!skipVals) - info.value((CacheObject)v); + else + info.value(skipVals ? null : (CacheObject)v); } return infos; http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 0202c53..abbe7b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -587,8 +587,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (keysSize != 0) { Map<K, V> map = new GridLeanMap<>(keysSize); - for (GridCacheEntryInfo info : infos) + for (GridCacheEntryInfo info : infos) { + assert skipVals == (info.value() == null); + cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable, false); + } return map; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index fb2c5ad..41df53a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -825,8 +825,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> futVer = cctx.versions().next(topVer); - if (storeFuture()) - cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this); + if (storeFuture()) { + if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) { + assert isDone() : GridNearAtomicUpdateFuture.this; + + return; + } + } // Assign version on near node in CLOCK ordering mode even if fastMap is false. if (updVer == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index a7875f6..d9763f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -703,6 +703,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma CacheObject val = info.value(); KeyCacheObject key = info.key(); + assert skipVals == (info.value() == null); + cctx.addResult(map, key, val, skipVals, false, deserializePortable, false); } catch (GridCacheEntryRemovedException ignore) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java index 8e8447e..e8622aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java @@ -18,23 +18,21 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; +import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; @@ -45,22 +43,14 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { /** Grid count. */ private static final int GRID_CNT = 8; - /** Grids. */ - private static Ignite[] grids; + /** */ + private AtomicReferenceArray<Ignite> nodes; - /** Ids. */ - private static String[] ids; - - /** Flags. */ - private static AtomicBoolean[] flags; - - /** Futs. */ - private static Collection<IgniteInternalFuture> futs; - - /** Alive grids. */ - private static Set<Integer> aliveGrids; + /** */ + private volatile boolean done; /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -81,17 +71,27 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { return cfg; } + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + /** * @throws Exception If failed. */ - public void testFailover() throws Exception { - int cnt = 10; + public void testContainsKeyFailover() throws Exception { + int cnt = 3; for (int i = 0; i < cnt; i++) { try { - U.debug("*** Iteration " + (i + 1) + '/' + cnt); - - init(); + log.info("Iteration: " + (i + 1) + '/' + cnt); doTestFailover(); } @@ -102,54 +102,34 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { } /** - * Initializes test. - */ - private void init() { - grids = new Ignite[GRID_CNT + 1]; - - ids = new String[GRID_CNT + 1]; - - aliveGrids = new HashSet<>(); - - flags = new AtomicBoolean[GRID_CNT + 1]; - - futs = new ArrayList<>(); - } - - /** * Executes one test iteration. + * @throws Exception If failed. */ private void doTestFailover() throws Exception { try { - for (int i = 0; i < GRID_CNT + 1; i++) { - final IgniteEx grid = startGrid(i); + done = false; - grids[i] = grid; + nodes = new AtomicReferenceArray<>(GRID_CNT); - ids[i] = grid.localNode().id().toString(); + startGridsMultiThreaded(GRID_CNT, false); - aliveGrids.add(i); + for (int i = 0; i < GRID_CNT ; i++) + assertTrue(nodes.compareAndSet(i, null, ignite(i))); - flags[i] = new AtomicBoolean(); - } + List<IgniteInternalFuture> futs = new ArrayList<>(); for (int i = 0; i < GRID_CNT + 1; i++) { - final int gridIdx = i; - futs.add(multithreadedAsync(new Runnable() { @Override public void run() { - IgniteCache cache = grids[gridIdx].cache(null); + T2<Ignite, Integer> ignite; - while (!flags[gridIdx].get()) { - int idx = ThreadLocalRandom.current().nextInt(GRID_CNT + 1); + while ((ignite = randomNode()) != null) { + IgniteCache<Object, Object> cache = ignite.get1().cache(null); - String id = ids[idx]; + for (int i = 0; i < 100; i++) + cache.containsKey(ThreadLocalRandom.current().nextInt(100_000)); - if (id != null /*&& grids[gridIdx] != null*/) { - //U.debug("!!! Grid containsKey start " + gridIdx); - cache.containsKey(id); - //U.debug("!!! Grid containsKey finished " + gridIdx); - } + assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1())); try { Thread.sleep(ThreadLocalRandom.current().nextLong(50)); @@ -163,18 +143,15 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { futs.add(multithreadedAsync(new Runnable() { @Override public void run() { - IgniteCache cache = grids[gridIdx].cache(null); + T2<Ignite, Integer> ignite; - while (!flags[gridIdx].get()) { - int idx = ThreadLocalRandom.current().nextInt(GRID_CNT + 1); + while ((ignite = randomNode()) != null) { + IgniteCache<Object, Object> cache = ignite.get1().cache(null); - String id = ids[idx]; + for (int i = 0; i < 100; i++) + cache.put(ThreadLocalRandom.current().nextInt(100_000), UUID.randomUUID()); - if (id != null /*&& grids[gridIdx] != null*/) { - //U.debug("!!! Grid put start " + gridIdx); - cache.put(id, UUID.randomUUID()); - //U.debug("!!! Grid put finished " + gridIdx); - } + assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1())); try { Thread.sleep(ThreadLocalRandom.current().nextLong(50)); @@ -187,35 +164,50 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { }, 1, "put-thread-" + i)); } - while (aliveGrids.size() > 1) { - final int gridToKill = ThreadLocalRandom.current().nextInt(GRID_CNT) + 1; + try { + int aliveGrids = GRID_CNT; - if (gridToKill > 0 && grids[gridToKill] != null) { - U.debug("!!! Trying to kill grid " + gridToKill); + while (aliveGrids > 0) { + T2<Ignite, Integer> ignite = randomNode(); - //synchronized (mons[gridToKill]) { - U.debug("!!! Grid stop start " + gridToKill); + assert ignite != null; - grids[gridToKill].close(); + Ignite ignite0 = ignite.get1(); - aliveGrids.remove(gridToKill); + log.info("Stop node: " + ignite0.name()); - grids[gridToKill] = null; + ignite0.close(); - flags[gridToKill].set(true); + log.info("Node stop finished: " + ignite0.name()); - U.debug("!!! Grid stop finished " + gridToKill); - //} + aliveGrids--; } } + finally { + done = true; + } - Thread.sleep(ThreadLocalRandom.current().nextLong(100)); + for (IgniteInternalFuture fut : futs) + fut.get(); } finally { - flags[0].set(true); + done = true; + } + } - for (IgniteInternalFuture fut : futs) - fut.get(); + /** + * @return Random node and its index. + */ + @Nullable private T2<Ignite, Integer> randomNode() { + while (!done) { + int idx = ThreadLocalRandom.current().nextInt(GRID_CNT); + + Ignite ignite = nodes.get(idx); + + if (ignite != null && nodes.compareAndSet(idx, ignite, null)) + return new T2<>(ignite, idx); } + + return null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0f083fd1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 228d99c..b89bffd 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransaction import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest; import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest; @@ -280,6 +281,8 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CrossCacheLockTest.class); suite.addTestSuite(IgniteCrossCacheTxSelfTest.class); + suite.addTestSuite(CacheGetFutureHangsSelfTest.class); + return suite; } } \ No newline at end of file
