Repository: ignite Updated Branches: refs/heads/ignite-5763 [created] 3373ec158
ignite-5763 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3373ec15 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3373ec15 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3373ec15 Branch: refs/heads/ignite-5763 Commit: 3373ec158f3eb6022e6906e3dc6d76fcab178087 Parents: 621b0ed Author: sboikov <sboi...@gridgain.com> Authored: Mon Jul 17 12:33:56 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jul 17 12:33:56 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 38 ++++- .../cache/IgniteDynamicCacheMultinodeTest.java | 168 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 2 + 3 files changed, 202 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3373ec15/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 9f5bd3f..347f6fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -192,6 +192,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } }; + /** */ + private final Object discoEvtMux = new Object(); + /** Discovery event worker. */ private final DiscoveryWorker discoWrk = new DiscoveryWorker(); @@ -551,6 +554,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { final ClusterNode node, final Collection<ClusterNode> topSnapshot, final Map<Long, Collection<ClusterNode>> snapshots, + @Nullable DiscoverySpiCustomMessage spiCustomMsg) { + synchronized (discoEvtMux) { + onDiscovery0(type, topVer, node, topSnapshot, snapshots, spiCustomMsg); + } + } + + /** + * @param type Event type. + * @param topVer Event topology version. + * @param node Event node. + * @param topSnapshot Topology snapsjot. + * @param snapshots Topology snapshots history. + * @param spiCustomMsg Custom event. + */ + private void onDiscovery0( + final int type, + final long topVer, + final ClusterNode node, + final Collection<ClusterNode> topSnapshot, + final Map<Long, Collection<ClusterNode>> snapshots, @Nullable DiscoverySpiCustomMessage spiCustomMsg ) { DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null @@ -2062,12 +2085,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { public void clientCacheStartEvent(UUID reqId, @Nullable Map<String, DynamicCacheChangeRequest> startReqs, @Nullable Set<String> cachesToClose) { - discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT, - AffinityTopologyVersion.NONE, - localNode(), - null, - Collections.<ClusterNode>emptyList(), - new ClientCacheChangeDummyDiscoveryMessage(reqId, startReqs, cachesToClose)); + // Prevent race when discovery message was processed, but was passed to discoWrk. + synchronized (discoEvtMux) { + discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT, + AffinityTopologyVersion.NONE, + localNode(), + null, + Collections.<ClusterNode>emptyList(), + new ClientCacheChangeDummyDiscoveryMessage(reqId, startReqs, cachesToClose)); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3373ec15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java new file mode 100644 index 0000000..d362189 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * + */ +public class IgniteDynamicCacheMultinodeTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 6; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 2); + + client = true; + + startGridsMultiThreaded(NODES - 2, 2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testGetOrCreateCache() throws Exception { + createCacheMultinode(TestOp.GET_OR_CREATE_CACHE); + } + + /** + * @throws Exception If failed. + */ + public void testGetOrCreateCaches() throws Exception { + createCacheMultinode(TestOp.GET_OR_CREATE_CACHES); + } + + /** + * @throws Exception If failed. + */ + private void createCacheMultinode(final TestOp op) throws Exception { + final int THREADS = NODES * 3; + + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + final CyclicBarrier b = new CyclicBarrier(THREADS); + + final AtomicInteger idx = new AtomicInteger(); + + final int iter = i; + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + Ignite node = ignite(idx.incrementAndGet() % NODES); + + b.await(); + + boolean sleep = iter % 2 == 0; + + if (sleep) + Thread.sleep(ThreadLocalRandom.current().nextLong(100) + 1); + + switch (op) { + case GET_OR_CREATE_CACHE: + node.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + break; + + case GET_OR_CREATE_CACHES: + node.getOrCreateCaches(cacheConfigurations()); + + break; + } + + return null; + } + }, THREADS, "start-cache"); + + for (String cache : ignite(0).cacheNames()) + ignite(0).destroyCache(cache); + } + } + + /** + * @return Cache configurations. + */ + private List<CacheConfiguration> cacheConfigurations() { + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + CacheConfiguration ccfg = new CacheConfiguration("cache-" + i); + + ccfg.setAtomicityMode(i % 2 == 0 ? ATOMIC : TRANSACTIONAL); + + ccfgs.add(ccfg); + } + + return ccfgs; + } + + /** + * + */ + enum TestOp { + /** */ + GET_OR_CREATE_CACHE, + + /** */ + GET_OR_CREATE_CACHES + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3373ec15/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index e7f38be..d931ea9 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheTxPreloadNoWriteTe import org.apache.ignite.internal.processors.cache.IgniteCacheTxReplicatedPeekModesTest; import org.apache.ignite.internal.processors.cache.IgniteClientCacheInitializationFailTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheFilterTest; +import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheMultinodeTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartNoExchangeTimeoutTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartSelfTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartStopConcurrentTest; @@ -219,6 +220,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheTxPreloadNoWriteTest.class); suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class); + suite.addTestSuite(IgniteDynamicCacheMultinodeTest.class); suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class); suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class); suite.addTestSuite(IgniteDynamicCacheStartStopConcurrentTest.class);