IGNITE-5187: Improved DynamicIndexAbstractConcurrentSelfTest reliability. This closes #2219.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15da654a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15da654a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15da654a Branch: refs/heads/ignite-2.1 Commit: 15da654a3e5c2bbf14947f0d3dd08270a9ba6766 Parents: 0f9a895 Author: Alexander Paschenko <[email protected]> Authored: Wed Jul 5 15:15:35 2017 +0300 Committer: devozerov <[email protected]> Committed: Wed Jul 5 15:15:35 2017 +0300 ---------------------------------------------------------------------- .../DynamicIndexAbstractConcurrentSelfTest.java | 81 +++++++++++--------- 1 file changed, 45 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/15da654a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java index 3fb8a30..913d724 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java @@ -17,6 +17,17 @@ package org.apache.ignite.internal.processors.cache.index; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -37,21 +48,9 @@ import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; -import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.T3; import org.jetbrains.annotations.NotNull; -import javax.cache.Cache; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi; /** @@ -66,7 +65,7 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde private static final int LARGE_CACHE_SIZE = 100_000; /** Latches to block certain index operations. */ - private static final ConcurrentHashMap<UUID, T2<CountDownLatch, AtomicBoolean>> BLOCKS = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<UUID, T3<CountDownLatch, AtomicBoolean, CountDownLatch>> BLOCKS = new ConcurrentHashMap<>(); /** Cache mode. */ private final CacheMode cacheMode; @@ -80,7 +79,7 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde * @param cacheMode Cache mode. * @param atomicityMode Atomicity mode. */ - protected DynamicIndexAbstractConcurrentSelfTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode) { + DynamicIndexAbstractConcurrentSelfTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode) { this.cacheMode = cacheMode; this.atomicityMode = atomicityMode; } @@ -93,10 +92,11 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde } /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") @Override protected void afterTest() throws Exception { GridQueryProcessor.idxCls = null; - for (T2<CountDownLatch, AtomicBoolean> block : BLOCKS.values()) + for (T3<CountDownLatch, AtomicBoolean, CountDownLatch> block : BLOCKS.values()) block.get1().countDown(); BLOCKS.clear(); @@ -146,14 +146,14 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde put(srv1, 0, KEY_AFTER); // Test migration between normal servers. - blockIndexing(srv1Id); + CountDownLatch idxLatch = blockIndexing(srv1Id); QueryIndex idx1 = index(IDX_NAME_1, field(FIELD_NAME_1)); IgniteInternalFuture<?> idxFut1 = queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx1, false); - Thread.sleep(100); + idxLatch.countDown(); //srv1.close(); Ignition.stop(srv1.name(), true); @@ -167,14 +167,14 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1); // Test migration from normal server to non-affinity server. - blockIndexing(srv2Id); + idxLatch = blockIndexing(srv2Id); QueryIndex idx2 = index(IDX_NAME_2, field(aliasUnescaped(FIELD_NAME_2))); IgniteInternalFuture<?> idxFut2 = queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx2, false); - Thread.sleep(100); + idxLatch.countDown(); //srv2.close(); Ignition.stop(srv2.name(), true); @@ -202,7 +202,7 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde createSqlCache(srv1); - blockIndexing(srv1); + CountDownLatch idxLatch = blockIndexing(srv1); QueryIndex idx1 = index(IDX_NAME_1, field(FIELD_NAME_1)); QueryIndex idx2 = index(IDX_NAME_2, field(aliasUnescaped(FIELD_NAME_2))); @@ -229,7 +229,7 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_2, field(aliasUnescaped(FIELD_NAME_2))); - Thread.sleep(100); + idxLatch.countDown(); put(srv1, 0, KEY_AFTER); @@ -250,7 +250,7 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde createSqlCache(srv1); - blockIndexing(srv1); + CountDownLatch idxLatch = blockIndexing(srv1); QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); @@ -267,7 +267,7 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde idxFut.get(); - Thread.sleep(100L); + idxLatch.countDown(); assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); @@ -391,15 +391,16 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde put(srv1, 0, LARGE_CACHE_SIZE); // Start index operation in blocked state. - blockIndexing(srv1); - blockIndexing(srv2); + CountDownLatch idxLatch1 = blockIndexing(srv1); + CountDownLatch idxLatch2 = blockIndexing(srv2); QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); final IgniteInternalFuture<?> idxFut = queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx, false); - Thread.sleep(100); + idxLatch1.countDown(); + idxLatch2.countDown(); // Start two more nodes and unblock index operation in the middle. Ignition.start(serverConfiguration(3)); @@ -435,19 +436,19 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde Ignite cli = Ignition.start(clientConfiguration(4)); // Start cache and populate it with data. - IgniteCache cache = createSqlCache(cli); + createSqlCache(cli); put(cli, KEY_AFTER); // Start index operation and block it on coordinator. - blockIndexing(srv1); + CountDownLatch idxLatch = blockIndexing(srv1); QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); final IgniteInternalFuture<?> idxFut = queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx, false); - Thread.sleep(100); + idxLatch.countDown(); // Destroy cache (drop table). destroySqlCache(cli); @@ -967,10 +968,10 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde * @param node Node. */ @SuppressWarnings("SuspiciousMethodCalls") - private static void blockIndexing(Ignite node) { + private static CountDownLatch blockIndexing(Ignite node) { UUID nodeId = ((IgniteEx)node).localNode().id(); - blockIndexing(nodeId); + return blockIndexing(nodeId); } /** @@ -979,10 +980,14 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde * @param nodeId Node. */ @SuppressWarnings("SuspiciousMethodCalls") - private static void blockIndexing(UUID nodeId) { + private static CountDownLatch blockIndexing(UUID nodeId) { assertFalse(BLOCKS.contains(nodeId)); - BLOCKS.put(nodeId, new T2<>(new CountDownLatch(1), new AtomicBoolean())); + CountDownLatch idxLatch = new CountDownLatch(1); + + BLOCKS.put(nodeId, new T3<>(new CountDownLatch(1), new AtomicBoolean(), idxLatch)); + + return idxLatch; } /** @@ -1001,8 +1006,9 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde * * @param nodeId Node ID. */ + @SuppressWarnings("ConstantConditions") private static void unblockIndexing(UUID nodeId) { - T2<CountDownLatch, AtomicBoolean> blocker = BLOCKS.remove(nodeId); + T3<CountDownLatch, AtomicBoolean, CountDownLatch> blocker = BLOCKS.remove(nodeId); assertNotNull(blocker); @@ -1014,12 +1020,15 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde * * @param nodeId Node ID. */ + @SuppressWarnings("ConstantConditions") private static void awaitIndexing(UUID nodeId) { - T2<CountDownLatch, AtomicBoolean> blocker = BLOCKS.get(nodeId); + T3<CountDownLatch, AtomicBoolean, CountDownLatch> blocker = BLOCKS.get(nodeId); if (blocker != null) { assertTrue(blocker.get2().compareAndSet(false, true)); + blocker.get3().countDown(); + while (true) { try { blocker.get1().await();
