Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 00140b310 -> 0560a5a55


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0560a5a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0560a5a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0560a5a5

Branch: refs/heads/ignite-1093-2
Commit: 0560a5a55350ac1c4911b548084a4ed338ec524e
Parents: 00140b3
Author: Anton Vinogradov <[email protected]>
Authored: Wed Oct 14 17:38:25 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Wed Oct 14 17:38:25 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCachePreloader.java    |  5 ++
 .../cache/GridCachePreloaderAdapter.java        |  6 +-
 .../dht/preloader/GridDhtPartitionDemander.java | 65 +++++++++++++-------
 .../dht/preloader/GridDhtPreloader.java         |  6 ++
 .../GridCacheRebalancingSyncSelfTest.java       |  2 +-
 5 files changed, 61 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 08aec71..9555bf4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -122,6 +122,11 @@ public interface GridCachePreloader {
     public IgniteInternalFuture<?> syncFuture();
 
     /**
+     * @return Future which will complete when preloading is finished on 
current topology.
+     */
+    public IgniteInternalFuture<Boolean> rebalanceFuture();
+
+    /**
      * Requests that preloader sends the request for the key.
      *
      * @param keys Keys to request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index be616f4..4aba537 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -118,6 +117,11 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+        return new GridFinishedFuture<>(true);
+    }
+
+    /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
         cctx.deploy().unwind(cctx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e68209e..27d433c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -97,7 +96,11 @@ public class GridDhtPartitionDemander {
 
     /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
     @GridToStringInclude
-    private volatile SyncFuture syncFut;
+    private final GridFutureAdapter syncFut = new GridFutureAdapter();
+
+    /** Rebalance future. */
+    @GridToStringInclude
+    private volatile RebalanceFuture rebalanceFut;
 
     /** Last timeout object. */
     private AtomicReference<GridTimeoutObject> lastTimeoutObj = new 
AtomicReference<>();
@@ -122,11 +125,13 @@ public class GridDhtPartitionDemander {
 
         boolean enabled = cctx.rebalanceEnabled() && 
!cctx.kernalContext().clientNode();
 
-        syncFut = new SyncFuture();//Dummy.
+        rebalanceFut = new RebalanceFuture();//Dummy.
 
-        if (!enabled)
+        if (!enabled) {
             // Calling onDone() immediately since preloading is disabled.
+            rebalanceFut.onDone(true);
             syncFut.onDone();
+        }
     }
 
     /**
@@ -139,7 +144,7 @@ public class GridDhtPartitionDemander {
      * Stop.
      */
     void stop() {
-        syncFut.cancel();
+        rebalanceFut.cancel();
 
         lastExchangeFut = null;
 
@@ -154,6 +159,13 @@ public class GridDhtPartitionDemander {
     }
 
     /**
+     * @return Rebalance future.
+     */
+    IgniteInternalFuture<Boolean> rebalanceFuture() {
+        return rebalanceFut;
+    }
+
+    /**
      * Sets preload predicate for demand pool.
      *
      * @param preloadPred Preload predicate.
@@ -191,10 +203,10 @@ public class GridDhtPartitionDemander {
      * @param fut Future.
      * @return {@code True} if topology changed.
      */
-    private boolean topologyChanged(SyncFuture fut) {
+    private boolean topologyChanged(RebalanceFuture fut) {
         return
             
!cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // 
Topology already changed.
-                fut != syncFut; // Same topology, but dummy exchange forced 
because of missing partitions.
+                fut != rebalanceFut; // Same topology, but dummy exchange 
forced because of missing partitions.
     }
 
     /**
@@ -212,12 +224,12 @@ public class GridDhtPartitionDemander {
      * @param name Cache name.
      * @param fut Future.
      */
-    private void waitForCacheRebalancing(String name, SyncFuture fut) {
+    private void waitForCacheRebalancing(String name, RebalanceFuture fut) {
         if (log.isDebugEnabled())
             log.debug("Waiting for " + name + " cache rebalancing [cacheName=" 
+ cctx.name() + ']');
 
         try {
-            SyncFuture wFut = 
(SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture();
+            RebalanceFuture wFut = 
(RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture();
 
             if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
                 if (!wFut.get())
@@ -258,9 +270,9 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             assert assigns != null;
 
-            final SyncFuture oldFut = syncFut;
+            final RebalanceFuture oldFut = rebalanceFut;
 
-            final SyncFuture fut = new SyncFuture(assigns, cctx, log, 
oldFut.isInitial(), cnt);
+            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, 
log, oldFut.isInitial(), cnt);
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
@@ -271,7 +283,7 @@ public class GridDhtPartitionDemander {
                     }
                 });
 
-            syncFut = fut;
+            rebalanceFut = fut;
 
             if (cctx.shared().exchange().hasPendingExchange()) { // Will 
rebalance at actual topology.
                 U.log(log, "Skipping obsolete exchange. [top=" + 
assigns.topologyVersion() + "]");
@@ -338,7 +350,7 @@ public class GridDhtPartitionDemander {
     /**
      * @param fut Future.
      */
-    private void requestPartitions(SyncFuture fut, GridDhtPreloaderAssignments 
assigns) {
+    private void requestPartitions(RebalanceFuture fut, 
GridDhtPreloaderAssignments assigns) {
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assigns.entrySet()) {
             if (topologyChanged(fut)) {
                 fut.cancel();
@@ -479,7 +491,7 @@ public class GridDhtPartitionDemander {
         final GridDhtPartitionSupplyMessageV2 supply) {
         AffinityTopologyVersion topVer = supply.topologyVersion();
 
-        final SyncFuture fut = syncFut;
+        final RebalanceFuture fut = rebalanceFut;
 
         ClusterNode node = cctx.node(id);
 
@@ -716,7 +728,7 @@ public class GridDhtPartitionDemander {
     /**
      *
      */
-    public static class SyncFuture extends GridFutureAdapter<Boolean> {
+    public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
         /** */
         private static final long serialVersionUID = 1L;
 
@@ -754,7 +766,7 @@ public class GridDhtPartitionDemander {
          * @param log Logger.
          * @param sentStopEvnt Stop event flag.
          */
-        SyncFuture(GridDhtPreloaderAssignments assigns,
+        RebalanceFuture(GridDhtPreloaderAssignments assigns,
             GridCacheContext<?, ?> cctx,
             IgniteLogger log,
             boolean sentStopEvnt,
@@ -772,7 +784,7 @@ public class GridDhtPartitionDemander {
                 
cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 
1).listen(
                     new CI1<IgniteInternalFuture<Long>>() {
                         @Override public void apply(IgniteInternalFuture<Long> 
future) {
-                            SyncFuture.this.cancel();
+                            RebalanceFuture.this.cancel();
                         }
                     }); // todo: is it necessary?
         }
@@ -780,7 +792,7 @@ public class GridDhtPartitionDemander {
         /**
          * Dummy future. Will be done by real one.
          */
-        public SyncFuture() {
+        public RebalanceFuture() {
             this.exchFut = null;
             this.topVer = null;
             this.cctx = null;
@@ -866,7 +878,7 @@ public class GridDhtPartitionDemander {
                 U.log(log, "Cancelled rebalancing from all nodes [cache=" + 
cctx.name()
                     + ", topology=" + topologyVersion());
 
-                checkIsDone();
+                checkIsDone(true /* cancelled */);
             }
             finally {
                 lock.unlock();
@@ -980,6 +992,14 @@ public class GridDhtPartitionDemander {
          *
          */
         private void checkIsDone() {
+            checkIsDone(false);
+        }
+
+        /**
+         *
+         * @param cancelled Is cancelled.
+         */
+        private void checkIsDone(boolean cancelled) {
             if (remaining.isEmpty()) {
                 if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && 
(!cctx.isReplicated() || sendStoppedEvnt))
                     preloadEvent(EVT_CACHE_REBALANCE_STOPPED, 
exchFut.discoveryEvent());
@@ -1009,6 +1029,9 @@ public class GridDhtPartitionDemander {
                     cctx.shared().exchange().scheduleResendPartitions();
                 }
 
+                if (!cancelled && !cctx.preloader().syncFuture().isDone())
+                    
((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+
                 onDone(true);
             }
         }
@@ -1086,12 +1109,12 @@ public class GridDhtPartitionDemander {
         /** Hide worker logger and use cache logger instead. */
         private IgniteLogger log = GridDhtPartitionDemander.this.log;
 
-        private volatile SyncFuture fut;
+        private volatile RebalanceFuture fut;
 
         /**
          * @param id Worker ID.
          */
-        private DemandWorker(int id, SyncFuture fut) {
+        private DemandWorker(int id, RebalanceFuture fut) {
             assert id >= 0;
 
             this.id = id;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 4a6a235..0080406 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -50,6 +50,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -424,6 +425,11 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         return cctx.kernalContext().clientNode() ? startFut : 
demander.syncFuture();
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+        return cctx.kernalContext().clientNode() ? new 
GridFinishedFuture<>(true) : demander.rebalanceFuture();
+    }
+
     /**
      * @param topVer Requested topology version.
      * @param fut Future to add.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0560a5a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 39d7c18..2b75adf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -300,7 +300,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
             finished = true;
 
             for (GridCacheAdapter c : 
grid(id).context().cache().internalCaches()) {
-                GridDhtPartitionDemander.SyncFuture fut = 
(GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture();
+                GridDhtPartitionDemander.RebalanceFuture fut = 
(GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture();
                 if (fut.topologyVersion() == null || 
!fut.topologyVersion().equals(top)) {
                     finished = false;
 

Reply via email to