ignite-5061: move rebalance enabled methods to GridCacheSharedContext&GridCacheProcessor
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0ece83f0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0ece83f0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0ece83f0 Branch: refs/heads/ignite-3484 Commit: 0ece83f02af219346e85269f0d80112b2faae25e Parents: b08eef2 Author: Alexander Belyak <[email protected]> Authored: Wed Sep 6 17:07:33 2017 +0300 Committer: Andrey Gura <[email protected]> Committed: Wed Sep 6 17:07:33 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgniteEx.java | 14 ++++ .../apache/ignite/internal/IgniteKernal.java | 12 +++ .../processors/cache/GridCacheProcessor.java | 8 ++ .../cache/GridCacheSharedContext.java | 20 +++++ .../dht/preloader/GridDhtPartitionDemander.java | 9 ++ .../org/apache/ignite/mxbean/IgniteMXBean.java | 21 +++++ .../cache/CacheRebalancingSelfTest.java | 88 +++++++++++++++++++- .../processors/igfs/IgfsIgniteMock.java | 10 +++ .../junits/multijvm/IgniteProcessProxy.java | 10 +++ 9 files changed, 190 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java index 0a44987..53b3e4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java @@ -160,4 +160,18 @@ public interface IgniteEx extends Ignite { * @return Kernal context. */ public GridKernalContext context(); + + /** + * Get rebalance enabled flag. + * + * @return {@code True} if rebalance enabled on node, {@code False} otherwise. + */ + public boolean isRebalanceEnabled(); + + /** + * Set rebalance enable flag on node. + * + * @param rebalanceEnabled rebalance enabled flag. + */ + public void rebalanceEnabled(boolean rebalanceEnabled); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index cfad8b4..a3b8651 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -461,6 +461,18 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ + @Override + public boolean isRebalanceEnabled() { + return ctx.cache().context().isRebalanceEnabled(); + } + + /** {@inheritDoc} */ + @Override + public void rebalanceEnabled(boolean rebalanceEnabled) { + ctx.cache().context().rebalanceEnabled(rebalanceEnabled); + } + + /** {@inheritDoc} */ @Override public long getUpTime() { return U.currentTimeMillis() - startTime; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index bd950fa..5d84809 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -279,6 +279,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Start cache rebalance. + */ + public void enableRebalance() { + for (IgniteCacheProxy c : publicCaches()) + c.rebalance(); + } + + /** * Create exchange worker task for custom discovery message. * * @param msg Custom discovery message. http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 82d960a..d8614b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -158,6 +158,9 @@ public class GridCacheSharedContext<K, V> { /** Concurrent DHT atomic updates counters. */ private AtomicIntegerArray dhtAtomicUpdCnt; + /** Rebalance enabled flag. */ + private boolean rebalanceEnabled = true; + /** */ private final List<IgniteChangeGlobalStateSupport> stateAwareMgrs; @@ -304,6 +307,23 @@ public class GridCacheSharedContext<K, V> { } /** + * @return rebalance enabled flag. + */ + public boolean isRebalanceEnabled() { + return this.rebalanceEnabled; + } + + /** + * @param rebalanceEnabled rebalance enabled flag. + */ + public void rebalanceEnabled(boolean rebalanceEnabled) { + this.rebalanceEnabled = rebalanceEnabled; + + if (rebalanceEnabled) + cache().enableRebalance(); + } + + /** * @param reconnectFut Reconnect future. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 2258187..54661ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -402,6 +402,15 @@ public class GridDhtPartitionDemander { return; } + if (!ctx.kernalContext().grid().isRebalanceEnabled()) { + if (log.isDebugEnabled()) + log.debug("Cancel partition demand because rebalance disabled on current node."); + + fut.cancel(); + + return; + } + synchronized (fut) { // Synchronized to prevent consistency issues in case of parallel cancellation. if (fut.isDone()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java index ce63e4f..428d03c 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java @@ -51,6 +51,27 @@ public interface IgniteMXBean { public String getStartTimestampFormatted(); /** + * Gets rebalance enabled flag. + * + * @return Rebalance enabled flag. + */ + @MXBeanDescription("Rebalance enabled flag.") + public boolean isRebalanceEnabled(); + + /** + * Enable or disable cache partition rebalance per node. + * + * @param rebalanceEnabled If {@code true} then set rebalance to enabled state. + */ + @MXBeanParametersDescriptions( + { + "Enable cache partitions rebalance on node.", + "Disable cache partitions rebalance on node." + } + ) + public void rebalanceEnabled(boolean rebalanceEnabled); + + /** * Gets string presentation of up-time for the kernal. * * @return String presentation of up-time for the kernal. http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java index f5ae59d..421ff08 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java @@ -18,24 +18,43 @@ package org.apache.ignite.internal.processors.cache; +import com.sun.org.apache.regexp.internal.RE; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.testframework.GridTestUtils; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + /** * Test for rebalancing. */ public class CacheRebalancingSelfTest extends GridCommonAbstractTest { + + /** Cache name with one backups */ + private static final String REBALANCE_TEST_CACHE_NAME = "rebalanceCache"; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)); + CacheConfiguration<Integer,Integer> rebalabceCacheCfg = new CacheConfiguration<>(); + rebalabceCacheCfg.setBackups(1); + rebalabceCacheCfg.setName(REBALANCE_TEST_CACHE_NAME); + + cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME), rebalabceCacheCfg); return cfg; } @@ -75,8 +94,73 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest { * @return Internal future. */ private static IgniteInternalFuture internalFuture(IgniteFuture fut) { - assert fut instanceof IgniteFutureImpl : fut; + assertTrue(fut.toString(), fut instanceof IgniteFutureImpl); return ((IgniteFutureImpl) fut).internalFuture(); } + + /** + * Test local cache size with and without rebalancing in case or topology change. + * + * @throws Exception If failed. + */ + public void testDisableRebalancing() throws Exception { + IgniteEx ig0 = startGrid(0); + IgniteEx ig1 = startGrid(1); + startGrid(2); + + ig1.rebalanceEnabled(false); + + Random r = new Random(); + + int totalKeysCount = 10240; + + IgniteCache<Integer, Integer> cache = ig0.getOrCreateCache(REBALANCE_TEST_CACHE_NAME); + + for (int i = 0;i < totalKeysCount;i++) + cache.put(r.nextInt(), 1); + + + testLocalCacheSize(ig0, 0, totalKeysCount); + int before_ig1 = testLocalCacheSize(ig1, 0, totalKeysCount); + + stopGrid(2); + + testLocalCacheSize(ig0, totalKeysCount, null); + testLocalCacheSize(ig1, before_ig1, null); + + + ig1.rebalanceEnabled(true); + + testLocalCacheSize(ig0, totalKeysCount, null); + testLocalCacheSize(ig1, totalKeysCount, null); + } + + /** + * Test if test cache in specified node have correct local size. + * + * @param ignite node to test + * @param expFrom left bound + * @param expTo right bound (or {@code null}) + * @return actual local cache size + * @throws IgniteInterruptedCheckedException + */ + private int testLocalCacheSize(IgniteEx ignite, final Integer expFrom, final Integer expTo) throws IgniteInterruptedCheckedException { + final IgniteCache cache = ignite.cache(REBALANCE_TEST_CACHE_NAME); + + boolean isOk = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Integer actualSize = cache.localSize(CachePeekMode.ALL); + + return expTo == null ? expFrom.equals(actualSize) : expFrom <= actualSize && actualSize <= expTo; + } + }, 10_000); + + int rslt = cache.localSize(CachePeekMode.ALL); + + assertTrue(ignite.configuration().getIgniteInstanceName() + " cache local size = " + + rslt + " not " + (expTo == null ? "equal " + expFrom : "in " + expFrom + "-" + expTo), isOk); + + return rslt; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java index df3c1ea..1e5fcd1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java @@ -178,6 +178,16 @@ public class IgfsIgniteMock implements IgniteEx { return null; } + @Override + public boolean isRebalanceEnabled() { + return true; + } + + @Override + public void rebalanceEnabled(boolean rebalanceEnabled) { + throwUnsupported(); + } + /** {@inheritDoc} */ @Override public String name() { return name; http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java index 4d51853..3075920 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -418,6 +418,16 @@ public class IgniteProcessProxy implements IgniteEx { throw new UnsupportedOperationException("Operation isn't supported yet."); } + @Override + public boolean isRebalanceEnabled() { + return true; + } + + @Override + public void rebalanceEnabled(boolean rebalanceEnabled) { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + /** {@inheritDoc} */ @Override public IgniteCompute compute() { throw new UnsupportedOperationException("Operation isn't supported yet.");
