Repository: ignite Updated Branches: refs/heads/ignite-1093-2 00140b310 -> 0560a5a55
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0560a5a5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0560a5a5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0560a5a5 Branch: refs/heads/ignite-1093-2 Commit: 0560a5a55350ac1c4911b548084a4ed338ec524e Parents: 00140b3 Author: Anton Vinogradov <[email protected]> Authored: Wed Oct 14 17:38:25 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Wed Oct 14 17:38:25 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCachePreloader.java | 5 ++ .../cache/GridCachePreloaderAdapter.java | 6 +- .../dht/preloader/GridDhtPartitionDemander.java | 65 +++++++++++++------- .../dht/preloader/GridDhtPreloader.java | 6 ++ .../GridCacheRebalancingSyncSelfTest.java | 2 +- 5 files changed, 61 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 08aec71..9555bf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -122,6 +122,11 @@ public interface GridCachePreloader { public IgniteInternalFuture<?> syncFuture(); /** + * @return Future which will complete when preloading is finished on current topology. + */ + public IgniteInternalFuture<Boolean> rebalanceFuture(); + + /** * Requests that preloader sends the request for the key. * * @param keys Keys to request. http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index be616f4..4aba537 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; import java.util.UUID; -import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; @@ -118,6 +117,11 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Boolean> rebalanceFuture() { + return new GridFinishedFuture<>(true); + } + + /** {@inheritDoc} */ @Override public void unwindUndeploys() { cctx.deploy().unwind(cctx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index e68209e..27d433c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -97,7 +96,11 @@ public class GridDhtPartitionDemander { /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */ @GridToStringInclude - private volatile SyncFuture syncFut; + private final GridFutureAdapter syncFut = new GridFutureAdapter(); + + /** Rebalance future. */ + @GridToStringInclude + private volatile RebalanceFuture rebalanceFut; /** Last timeout object. */ private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>(); @@ -122,11 +125,13 @@ public class GridDhtPartitionDemander { boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); - syncFut = new SyncFuture();//Dummy. + rebalanceFut = new RebalanceFuture();//Dummy. - if (!enabled) + if (!enabled) { // Calling onDone() immediately since preloading is disabled. + rebalanceFut.onDone(true); syncFut.onDone(); + } } /** @@ -139,7 +144,7 @@ public class GridDhtPartitionDemander { * Stop. */ void stop() { - syncFut.cancel(); + rebalanceFut.cancel(); lastExchangeFut = null; @@ -154,6 +159,13 @@ public class GridDhtPartitionDemander { } /** + * @return Rebalance future. + */ + IgniteInternalFuture<Boolean> rebalanceFuture() { + return rebalanceFut; + } + + /** * Sets preload predicate for demand pool. * * @param preloadPred Preload predicate. @@ -191,10 +203,10 @@ public class GridDhtPartitionDemander { * @param fut Future. * @return {@code True} if topology changed. */ - private boolean topologyChanged(SyncFuture fut) { + private boolean topologyChanged(RebalanceFuture fut) { return !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed. - fut != syncFut; // Same topology, but dummy exchange forced because of missing partitions. + fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions. } /** @@ -212,12 +224,12 @@ public class GridDhtPartitionDemander { * @param name Cache name. * @param fut Future. */ - private void waitForCacheRebalancing(String name, SyncFuture fut) { + private void waitForCacheRebalancing(String name, RebalanceFuture fut) { if (log.isDebugEnabled()) log.debug("Waiting for " + name + " cache rebalancing [cacheName=" + cctx.name() + ']'); try { - SyncFuture wFut = (SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture(); + RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture(); if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) { if (!wFut.get()) @@ -258,9 +270,9 @@ public class GridDhtPartitionDemander { if (delay == 0 || force) { assert assigns != null; - final SyncFuture oldFut = syncFut; + final RebalanceFuture oldFut = rebalanceFut; - final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isInitial(), cnt); + final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt); if (!oldFut.isInitial()) oldFut.cancel(); @@ -271,7 +283,7 @@ public class GridDhtPartitionDemander { } }); - syncFut = fut; + rebalanceFut = fut; if (cctx.shared().exchange().hasPendingExchange()) { // Will rebalance at actual topology. U.log(log, "Skipping obsolete exchange. [top=" + assigns.topologyVersion() + "]"); @@ -338,7 +350,7 @@ public class GridDhtPartitionDemander { /** * @param fut Future. */ - private void requestPartitions(SyncFuture fut, GridDhtPreloaderAssignments assigns) { + private void requestPartitions(RebalanceFuture fut, GridDhtPreloaderAssignments assigns) { for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { if (topologyChanged(fut)) { fut.cancel(); @@ -479,7 +491,7 @@ public class GridDhtPartitionDemander { final GridDhtPartitionSupplyMessageV2 supply) { AffinityTopologyVersion topVer = supply.topologyVersion(); - final SyncFuture fut = syncFut; + final RebalanceFuture fut = rebalanceFut; ClusterNode node = cctx.node(id); @@ -716,7 +728,7 @@ public class GridDhtPartitionDemander { /** * */ - public static class SyncFuture extends GridFutureAdapter<Boolean> { + public static class RebalanceFuture extends GridFutureAdapter<Boolean> { /** */ private static final long serialVersionUID = 1L; @@ -754,7 +766,7 @@ public class GridDhtPartitionDemander { * @param log Logger. * @param sentStopEvnt Stop event flag. */ - SyncFuture(GridDhtPreloaderAssignments assigns, + RebalanceFuture(GridDhtPreloaderAssignments assigns, GridCacheContext<?, ?> cctx, IgniteLogger log, boolean sentStopEvnt, @@ -772,7 +784,7 @@ public class GridDhtPartitionDemander { cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen( new CI1<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> future) { - SyncFuture.this.cancel(); + RebalanceFuture.this.cancel(); } }); // todo: is it necessary? } @@ -780,7 +792,7 @@ public class GridDhtPartitionDemander { /** * Dummy future. Will be done by real one. */ - public SyncFuture() { + public RebalanceFuture() { this.exchFut = null; this.topVer = null; this.cctx = null; @@ -866,7 +878,7 @@ public class GridDhtPartitionDemander { U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name() + ", topology=" + topologyVersion()); - checkIsDone(); + checkIsDone(true /* cancelled */); } finally { lock.unlock(); @@ -980,6 +992,14 @@ public class GridDhtPartitionDemander { * */ private void checkIsDone() { + checkIsDone(false); + } + + /** + * + * @param cancelled Is cancelled. + */ + private void checkIsDone(boolean cancelled) { if (remaining.isEmpty()) { if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt)) preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); @@ -1009,6 +1029,9 @@ public class GridDhtPartitionDemander { cctx.shared().exchange().scheduleResendPartitions(); } + if (!cancelled && !cctx.preloader().syncFuture().isDone()) + ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); + onDone(true); } } @@ -1086,12 +1109,12 @@ public class GridDhtPartitionDemander { /** Hide worker logger and use cache logger instead. */ private IgniteLogger log = GridDhtPartitionDemander.this.log; - private volatile SyncFuture fut; + private volatile RebalanceFuture fut; /** * @param id Worker ID. */ - private DemandWorker(int id, SyncFuture fut) { + private DemandWorker(int id, RebalanceFuture fut) { assert id >= 0; this.id = id; http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 4a6a235..0080406 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; @@ -424,6 +425,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture(); } + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Boolean> rebalanceFuture() { + return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture(); + } + /** * @param topVer Requested topology version. * @param fut Future to add. http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 39d7c18..2b75adf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -300,7 +300,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { finished = true; for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) { - GridDhtPartitionDemander.SyncFuture fut = (GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture(); + GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture(); if (fut.topologyVersion() == null || !fut.topologyVersion().equals(top)) { finished = false;
