ignite-1534 Fixed races in dynamic cache start
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e250c7f5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e250c7f5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e250c7f5 Branch: refs/heads/ignite-1534-1 Commit: e250c7f5c42fcdde8c776242314963cc2e0a63b3 Parents: 2029b7b Author: sboikov <[email protected]> Authored: Thu Oct 1 12:03:44 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Oct 1 12:03:44 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 2 +- .../cache/DynamicCacheDescriptor.java | 17 +++ .../GridCachePartitionExchangeManager.java | 72 +++-------- .../processors/cache/GridCacheProcessor.java | 8 +- .../distributed/IgniteCacheCreatePutTest.java | 125 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 2 + 6 files changed, 165 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index aec36a2..1fe45b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -550,7 +550,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { gridStartTime = getSpi().getGridStartTime(); updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()), - new DiscoCache(localNode(), getSpi().getRemoteNodes())); + new DiscoCache(localNode(), F.view(topSnapshot, F.remoteNodes(locNode.id())))); startLatch.countDown(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 24df7e4..b100a31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -68,6 +68,9 @@ public class DynamicCacheDescriptor { /** */ private AffinityTopologyVersion startTopVer; + /** */ + private boolean rcvdOnDiscovery; + /** * @param ctx Context. * @param cacheCfg Cache configuration. @@ -236,6 +239,20 @@ public class DynamicCacheDescriptor { this.updatesAllowed = updatesAllowed; } + /** + * @return {@code True} if received in discovery data. + */ + public boolean receivedOnDiscovery() { + return rcvdOnDiscovery; + } + + /** + * @param rcvdOnDiscovery {@code True} if received in discovery data. + */ + public void receivedOnDiscovery(boolean rcvdOnDiscovery) { + this.rcvdOnDiscovery = rcvdOnDiscovery; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName())); http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 3e77e0d..adc2174 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -105,18 +105,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** Partition resend timeout after eviction. */ private final long partResendTimeout = getLong(IGNITE_PRELOAD_RESEND_TIMEOUT, DFLT_PRELOAD_RESEND_TIMEOUT); - /** Latch which completes after local exchange future is created. */ - private GridFutureAdapter<?> locExchFut; - /** */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); /** Last partition refresh. */ private final AtomicLong lastRefresh = new AtomicLong(-1); - /** Pending futures. */ - private final Queue<GridDhtPartitionsExchangeFuture> pendingExchangeFuts = new ConcurrentLinkedQueue<>(); - /** */ @GridToStringInclude private ExchangeWorker exchWorker; @@ -229,31 +223,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (exchId != null) { - // Start exchange process. - pendingExchangeFuts.add(exchFut); + if (log.isDebugEnabled()) + log.debug("Discovery event (will start exchange): " + exchId); // Event callback - without this callback future will never complete. exchFut.onEvent(exchId, e); + // Start exchange process. + addFuture(exchFut); + } + else { if (log.isDebugEnabled()) - log.debug("Discovery event (will start exchange): " + exchId); - - locExchFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - if (!enterBusy()) - return; - - try { - // Unwind in the order of discovery events. - for (GridDhtPartitionsExchangeFuture f = pendingExchangeFuts.poll(); f != null; - f = pendingExchangeFuts.poll()) - addFuture(f); - } - finally { - leaveBusy(); - } - } - }); + log.debug("Do not start exchange for discovery event: " + evt); } } finally { @@ -266,8 +247,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void start0() throws IgniteCheckedException { super.start0(); - locExchFut = new GridFutureAdapter<>(); - exchWorker = new ExchangeWorker(); cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, @@ -328,12 +307,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); - new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); - - onDiscoveryEvent(cctx.localNodeId(), fut); + exchWorker.futQ.addFirst(fut); - // Allow discovery events to get processed. - locExchFut.onDone(); + new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); if (reconnect) { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @@ -382,8 +358,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.preloader().onInitialExchangeComplete(null); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.startTopologyVersion() == null) + cacheCtx.preloader().onInitialExchangeComplete(null); + } if (log.isDebugEnabled()) log.debug("Finished waiting for initial exchange: " + fut.exchangeId()); @@ -414,12 +392,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (AffinityReadyFuture f : readyFuts.values()) f.onDone(stopErr); - for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts) - f.onDone(stopErr); - - if (locExchFut != null) - locExchFut.onDone(stopErr); - U.cancel(exchWorker); if (log.isDebugEnabled()) @@ -583,22 +555,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param nodeId New node ID. - * @param fut Exchange future. - */ - void onDiscoveryEvent(UUID nodeId, GridDhtPartitionsExchangeFuture fut) { - if (!enterBusy()) - return; - - try { - addFuture(fut); - } - finally { - leaveBusy(); - } - } - - /** * @param evt Discovery event. * @return Affinity topology version. */ @@ -1033,7 +989,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Pending exchange futures:"); - for (GridDhtPartitionsExchangeFuture fut : pendingExchangeFuts) + for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ) U.warn(log, ">>> " + fut); ExchangeFutureSet exchFuts = this.exchFuts; http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5591fa6..2dad84e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -805,7 +805,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean loc = desc.locallyConfigured(); - if (loc || CU.affinityNode(locNode, filter)) { + if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); CachePluginManager pluginMgr = desc.pluginManager(); @@ -1958,7 +1958,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (req.initiatingNodeId() == null) desc.staticallyConfigured(true); - registeredCaches.put(maskNull(req.cacheName()), desc); + DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc); + + assert old == null : old; + + desc.receivedOnDiscovery(true); ctx.discovery().setCacheFilter( req.cacheName(), http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java new file mode 100644 index 0000000..8b3d9d3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class IgniteCacheCreatePutTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + OptimizedMarshaller marsh = new OptimizedMarshaller(); + marsh.setRequireSerializable(false); + + cfg.setMarshaller(marsh); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName("cache*"); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60 * 1000L; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testStartNodes() throws Exception { + long stopTime = System.currentTimeMillis() + 2 * 60_000; + + try { + int iter = 0; + + while (System.currentTimeMillis() < stopTime) { + log.info("Iteration: " + iter++); + + try { + final AtomicInteger idx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override + public Void call() throws Exception { + int node = idx.getAndIncrement(); + + Ignite ignite = startGrid(node); + + IgniteCache<Object, Object> cache = ignite.getOrCreateCache("cache1"); + + assertNotNull(cache); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + return null; + } + }, GRID_CNT, "start"); + } + finally { + stopAllGrids(); + } + } + } + finally { + stopAllGrids(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e250c7f5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index f8c9d26..228d99c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -79,6 +79,7 @@ import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest; import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest; @@ -205,6 +206,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteDynamicCacheStartNoExchangeTimeoutTest.class); suite.addTestSuite(CacheAffinityEarlyTest.class); suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class); + suite.addTestSuite(IgniteCacheCreatePutTest.class); suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
