1093

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eeb313c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eeb313c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eeb313c4

Branch: refs/heads/ignite-1093-2
Commit: eeb313c44980ac96e96890d823602716f815e098
Parents: b7e9179
Author: Anton Vinogradov <[email protected]>
Authored: Fri Sep 25 14:01:41 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Fri Sep 25 14:01:41 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionSupplier.java | 56 ++++++++++----------
 .../dht/preloader/GridDhtPreloader.java         | 15 ++++--
 .../GridCacheRebalancingSyncSelfTest.java       | 50 +++++++++--------
 3 files changed, 69 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb313c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index a4bd134..e23a50b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -45,6 +46,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -69,8 +71,9 @@ class GridDhtPartitionSupplier {
     /** Preload predicate. */
     private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
-    /** Supply context map. */
-    private final ConcurrentHashMap8<T4, SupplyContext> scMap = new 
ConcurrentHashMap8<>();
+    /** Supply context map. T4: nodeId, idx, topologyVersion, updateSequence. 
*/
+    private final ConcurrentHashMap8<T4<UUID, Integer, 
AffinityTopologyVersion, Long>, SupplyContext> scMap =
+        new ConcurrentHashMap8<>();
 
     /** Rebalancing listener. */
     private GridLocalEventListener lsnr;
@@ -97,7 +100,27 @@ class GridDhtPartitionSupplier {
         lsnr = new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
                 if (evt instanceof DiscoveryEvent) {
-                    clearContexts(scMap, log, cctx);
+                    for (Map.Entry<T4<UUID, Integer, AffinityTopologyVersion, 
Long>, SupplyContext> entry : scMap.entrySet()) {
+                        T4<UUID, Integer, AffinityTopologyVersion, Long> t = 
entry.getKey();
+
+                        SupplyContext sc = entry.getValue();
+
+                       if (t.get3() != null && 
!t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null)
+                            clearContext(scMap, t, sc, log);
+                    }
+                }
+                else if (evt instanceof CacheRebalancingEvent) {
+                    CacheRebalancingEvent e = (CacheRebalancingEvent)evt;
+
+                    if (cctx.name().equals(e.cacheName())) {
+                        UUID id = e.discoveryNode().id();
+
+                        for (Map.Entry<T4<UUID, Integer, 
AffinityTopologyVersion, Long>, SupplyContext> entry : scMap.entrySet()) {
+                            if (id.equals(entry.getKey().get1()))
+                                clearContext(scMap, entry.getKey(), 
entry.getValue(), log);
+                        }
+
+                    }
                 }
                 else {
                     assert false;
@@ -105,9 +128,7 @@ class GridDhtPartitionSupplier {
             }
         };
 
-        //todo: rebalance stopped.
-
-        cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, 
EVT_NODE_FAILED);
+        cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, 
EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
 
         startOldListeners();
     }
@@ -122,25 +143,6 @@ class GridDhtPartitionSupplier {
     }
 
     /**
-     * Clear contexts.
-     *
-     * @param map Context map.
-     * @param log Logger.
-     * @param cctx Context.
-     */
-    private static void clearContexts(
-        ConcurrentHashMap8<T4, SupplyContext> map, IgniteLogger log, 
GridCacheContext<?, ?> cctx) {
-        for (Map.Entry<T4, SupplyContext> entry : map.entrySet()) {
-            T4 t = entry.getKey();
-
-            SupplyContext sc = entry.getValue();
-
-            if (t.get3() != null && 
!t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null)
-                clearContext(map, t, sc, log);
-        }
-    }
-
-    /**
      * Clear context.
      *
      * @param map Context map.
@@ -150,8 +152,8 @@ class GridDhtPartitionSupplier {
      * @return true in case context was removed.
      */
     private static boolean clearContext(
-        final ConcurrentHashMap8<T4, SupplyContext> map,
-        final T4 t,
+        final ConcurrentHashMap8<T4<UUID, Integer, AffinityTopologyVersion, 
Long>, SupplyContext> map,
+        final T4<UUID, Integer, AffinityTopologyVersion, Long> t,
         final SupplyContext sc,
         final IgniteLogger log) {
         final Iterator it = sc.entryIt;

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb313c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 30b5505..ec9b8e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -27,7 +27,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -60,13 +59,13 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
@@ -376,6 +375,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     /** {@inheritDoc} */
     public void handleSupplyMessage(int idx, UUID id, final 
GridDhtPartitionSupplyMessageV2 s) {
         demandLock.readLock().lock();
+
         try {
             demander.handleSupplyMessage(idx, id, s);
         }
@@ -391,7 +391,14 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload) throws IgniteCheckedException {
-        demander.addAssignments(assignments, forcePreload);
+        demandLock.writeLock().lock();
+
+        try {
+            demander.addAssignments(assignments, forcePreload);
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb313c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 42c1857..be8e24b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -169,7 +169,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
     protected void checkData(Ignite ignite, String name, int from) throws 
IgniteCheckedException {
         for (int i = from; i < from + TEST_SIZE; i++) {
             if (i % (TEST_SIZE / 10) == 0)
-                log.info("Checked " + i * 100 / (TEST_SIZE) + "% entries (" + 
TEST_SIZE + ").");
+                log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + 
"% entries (" + TEST_SIZE + ").");
 
             assert ignite.cache(name).get(i) != null && 
ignite.cache(name).get(i).equals(i + name.hashCode()) :
                 i + " value " + (i + name.hashCode()) + " does not match (" + 
ignite.cache(name).get(i) + ")";
@@ -252,12 +252,27 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
 
                     break;
                 }
-                else
-                    fut.get();
+                else if (!fut.get()) {
+                    finished = false;
+
+                    log.warning("Rebalancing finished with missed 
partitions.");
+                }
             }
         }
     }
 
+    private void test() throws Exception {
+        while (true) {
+            testComplexRebalancing();
+
+            U.sleep(5000);
+        }
+    }
+
+    @Override protected long getTestTimeout() {
+        return Long.MAX_VALUE;
+    }
+
     /**
      * @throws Exception
      */
@@ -270,7 +285,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
 
         long start = System.currentTimeMillis();
 
-        new Thread() {
+        Thread t1 = new Thread() {
             @Override public void run() {
                 try {
                     startGrid(1);
@@ -295,9 +310,9 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
                     e.printStackTrace();
                 }
             }
-        }.start();
+        };
 
-        new Thread() {
+        Thread t2 = new Thread() {
             @Override public void run() {
                 try {
                     startGrid(3);
@@ -309,50 +324,43 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
                     e.printStackTrace();
                 }
             }
-        }.start();// Should cancel current rebalancing.
+        };
 
-        while (!concurrentStartFinished || !concurrentStartFinished2) {
-            U.sleep(10);
-        }
+        t1.start();
+        t2.start();// Should cancel t1 rebalancing.
 
-        //wait until cache rebalanced in async mode
+        t1.join();
+        t2.join();
 
         waitForRebalancing(1, 5, 1);
         waitForRebalancing(2, 5, 1);
         waitForRebalancing(3, 5, 1);
         waitForRebalancing(4, 5, 1);
 
-        //cache rebalanced in async node
+        checkData(grid(4), 0);
 
         stopGrid(0);
 
-        //wait until cache rebalanced
         waitForRebalancing(1, 6);
         waitForRebalancing(2, 6);
         waitForRebalancing(3, 6);
         waitForRebalancing(4, 6);
 
-        //cache rebalanced
-
         stopGrid(1);
 
-        //wait until cache rebalanced
         waitForRebalancing(2, 7);
         waitForRebalancing(3, 7);
         waitForRebalancing(4, 7);
 
-        //cache rebalanced
-
         stopGrid(2);
 
-        //wait until cache rebalanced
         waitForRebalancing(3, 8);
         waitForRebalancing(4, 8);
 
-        //cache rebalanced
-
         stopGrid(3);
 
+        waitForRebalancing(4, 9);
+
         long spend = (System.currentTimeMillis() - start) / 1000;
 
         checkData(grid(4), 0);

Reply via email to