IGNITE-5399 Manual cache rebalancing fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c098d75d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c098d75d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c098d75d Branch: refs/heads/ignite-2.1.2-exchange Commit: c098d75d4d767c7382867ca0bcb9e32a279487df Parents: 7ffe15e Author: Evgenii Zhuravlev <[email protected]> Authored: Thu Jun 22 09:43:03 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jun 22 09:43:03 2017 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 3 +- .../GridCachePartitionExchangeManager.java | 6 +- .../processors/cache/GridCachePreloader.java | 4 +- .../cache/GridCachePreloaderAdapter.java | 4 +- .../dht/preloader/GridDhtPartitionDemander.java | 17 +- .../GridDhtPartitionsExchangeFuture.java | 7 +- .../dht/preloader/GridDhtPreloader.java | 2 +- .../rebalancing/CacheManualRebalancingTest.java | 178 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite5.java | 2 + 9 files changed, 200 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c098d75d/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index aeabdb9..0cf2a82 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -1381,7 +1381,8 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * See {@link CacheConfiguration#getRebalanceDelay()} for more information on how to configure * rebalance re-partition delay. * <p> - * @return Future that will be completed when rebalancing is finished. + * @return Future that will be completed when rebalancing is finished. Future.get() returns true + * when rebalance was successfully finished. */ public IgniteFuture<?> rebalance(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c098d75d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index dea690e..d997b79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridListSet; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -783,7 +784,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param exchFut Exchange future. */ public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) { - GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>(); + GridCompoundFuture<Boolean, Boolean> fut = new GridCompoundFuture<>(CU.boolReducer()); exchWorker.addExchangeFuture( new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut)); @@ -1984,6 +1985,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } + if (exchFut.forcedRebalanceFuture() != null) + exchFut.forcedRebalanceFuture().markInitialized(); + if (assignsCancelled) { // Pending exchange. U.log(log, "Skipping rebalancing (obsolete exchange ID) " + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + http://git-wip-us.apache.org/repos/asf/ignite/blob/c098d75d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 0ac0272..4e74532 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; -import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -81,7 +81,7 @@ public interface GridCachePreloader { boolean forcePreload, int cnt, Runnable next, - @Nullable GridFutureAdapter<Boolean> forcedRebFut); + @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut); /** * @param p Preload predicate. http://git-wip-us.apache.org/repos/asf/ignite/blob/c098d75d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 98874e4..d2ca229 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -30,8 +30,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -161,7 +161,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { boolean forcePreload, int cnt, Runnable next, - @Nullable GridFutureAdapter<Boolean> forcedRebFut) { + @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c098d75d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index a1b45df..e4f0d08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -257,7 +258,7 @@ public class GridDhtPartitionDemander { boolean force, int cnt, final Runnable next, - @Nullable final GridFutureAdapter<Boolean> forcedRebFut) { + @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut) { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -280,18 +281,8 @@ public class GridDhtPartitionDemander { }); } - if (forcedRebFut != null) { - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> future) { - try { - forcedRebFut.onDone(future.get()); - } - catch (Exception e) { - forcedRebFut.onDone(e); - } - } - }); - } + if (forcedRebFut != null) + forcedRebFut.add(fut); rebalanceFut = fut; http://git-wip-us.apache.org/repos/asf/ignite/blob/c098d75d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 9ee8734..a7122b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -213,7 +214,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private final ConcurrentMap<UUID, GridDhtPartitionsAbstractMessage> msgs = new ConcurrentHashMap8<>(); /** Forced Rebalance future. */ - private GridFutureAdapter<Boolean> forcedRebFut; + private GridCompoundFuture<Boolean, Boolean> forcedRebFut; /** * Dummy future created to trigger reassignments if partition @@ -253,7 +254,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param forcedRebFut Forced Rebalance future. */ public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, DiscoveryEvent discoEvt, - GridDhtPartitionExchangeId exchId, GridFutureAdapter<Boolean> forcedRebFut) { + GridDhtPartitionExchangeId exchId, GridCompoundFuture<Boolean, Boolean> forcedRebFut) { dummy = false; forcePreload = true; @@ -452,7 +453,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @return Forced Rebalance future. */ - @Nullable public GridFutureAdapter<Boolean> forcedRebalanceFuture() { + @Nullable public GridCompoundFuture<Boolean, Boolean> forcedRebalanceFuture() { return forcedRebFut; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c098d75d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index a931ef4..136202e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -328,7 +328,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { boolean forceRebalance, int cnt, Runnable next, - @Nullable GridFutureAdapter<Boolean> forcedRebFut) { + @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut) { return demander.addAssignments(assignments, forceRebalance, cnt, next, forcedRebFut); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c098d75d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheManualRebalancingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheManualRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheManualRebalancingTest.java new file mode 100644 index 0000000..362cfa2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheManualRebalancingTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.compute.ComputeTaskFuture; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** */ +public class CacheManualRebalancingTest extends GridCommonAbstractTest { + /** */ + private static final String MYCACHE = "mycache"; + + /** */ + public static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + public static final int NODES_CNT = 2; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setCacheConfiguration(cacheConfiguration(), new CacheConfiguration(DEFAULT_CACHE_NAME)); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private static CacheConfiguration cacheConfiguration() { + return new CacheConfiguration(MYCACHE) + .setAtomicityMode(ATOMIC) + .setCacheMode(CacheMode.PARTITIONED) + .setWriteSynchronizationMode(FULL_SYNC) + .setRebalanceMode(ASYNC) + .setRebalanceDelay(-1) + .setBackups(1) + .setCopyOnRead(true) + .setReadFromBackup(true); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 400_000; + } + + /** + * @throws Exception If failed. + */ + public void testRebalance() throws Exception { + // Fill cache with large dataset to make rebalancing slow. + try (IgniteDataStreamer<Object, Object> streamer = grid(0).dataStreamer(MYCACHE)) { + for (int i = 0; i < 100_000; i++) + streamer.addData(i, i); + } + + // Start new node. + final IgniteEx newNode = startGrid(NODES_CNT); + + int newNodeCacheSize; + + // Start manual rebalancing. + IgniteCompute compute = newNode.compute().withAsync(); + + compute.broadcast(new MyCallable()); + + final ComputeTaskFuture<Object> rebalanceTaskFuture = compute.future(); + + boolean rebalanceFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return rebalanceTaskFuture.isDone(); + } + }, 10_000); + + assertTrue(rebalanceFinished); + + assertTrue(newNode.context().cache().cache(MYCACHE).context().preloader().rebalanceFuture().isDone()); + + newNodeCacheSize = newNode.cache(MYCACHE).localSize(CachePeekMode.ALL); + + System.out.println("New node cache local size: " + newNodeCacheSize); + + assertTrue(newNodeCacheSize > 0); + + } + + /** */ + public static class MyCallable implements IgniteRunnable { + /** */ + @IgniteInstanceResource + Ignite localNode; + + /** {@inheritDoc} */ + @Override public void run() { + IgniteLogger log = localNode.log(); + + log.info("Start local rebalancing caches"); + + for (String cacheName : localNode.cacheNames()) { + IgniteCache<?, ?> cache = localNode.cache(cacheName); + + assertNotNull(cache); + + boolean finished; + + log.info("Start rebalancing cache: " + cacheName + ", size: " + cache.localSize()); + + do { + IgniteFuture<?> rebalance = cache.rebalance(); + + log.info("Wait rebalancing cache: " + cacheName + " - " + rebalance); + + finished = (Boolean)rebalance.get(); + + log.info("Rebalancing cache: " + cacheName + " - " + rebalance); + + if (finished) { + log.info("Finished rebalancing cache: " + cacheName + ", size: " + + cache.localSize(CachePeekMode.PRIMARY) + cache.localSize(CachePeekMode.BACKUP)); + } else + log.info("Rescheduled rebalancing cache: " + cacheName + ", size: " + cache.localSize()); + } + while (!finished); + } + + log.info("Finished local rebalancing caches"); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c098d75d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index 4134dc1..3e6f3c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGroups import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePartitionLossPolicySelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.IgniteCacheAtomicProtocolTest; +import org.apache.ignite.internal.processors.cache.distributed.rebalancing.CacheManualRebalancingTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest; import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest; @@ -78,6 +79,7 @@ public class IgniteCacheTestSuite5 extends TestSuite { suite.addTestSuite(IgniteCacheGroupsPartitionLossPolicySelfTest.class); suite.addTestSuite(CacheRebalancingSelfTest.class); + suite.addTestSuite(CacheManualRebalancingTest.class); // Affinity tests. suite.addTestSuite(GridCacheAffinityBackupsSelfTest.class);
