Repository: ignite
Updated Branches:
  refs/heads/master 08f98e384 -> efa326928


IGNITE-7366 Affinity assignment exception in service processor during multiple 
nodes join - Fixes #4321.

Signed-off-by: Ivan Rakov <ira...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/efa32692
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/efa32692
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/efa32692

Branch: refs/heads/master
Commit: efa3269280f6c54b93e13af2fa56c7d27b46208a
Parents: 08f98e3
Author: Pereslegin Pavel <sbt-pereslegin-...@mail.ca.sbrf.ru>
Authored: Wed Jul 11 17:25:34 2018 +0300
Committer: Ivan Rakov <ira...@apache.org>
Committed: Wed Jul 11 17:25:34 2018 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           | 61 +++++++++++----
 .../GridServiceReassignmentSelfTest.java        | 45 ++++++++---
 .../service/IgniteServiceReassignmentTest.java  | 79 ++++++++++++++++++++
 3 files changed, 162 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/efa32692/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 0e4a318..f8c4b73 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -69,6 +69,7 @@ import 
org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
@@ -1739,6 +1740,47 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
         /** */
         private volatile AffinityTopologyVersion currTopVer = null;
 
+        /**
+         * Check that listening-in topology version is the latest and wait 
until exchange is finished.
+         *
+         * @param initTopVer listening-in topology version.
+         * @return {@code True} if current event is not last and should be 
skipped.
+         */
+        private boolean skipExchange(AffinityTopologyVersion initTopVer) {
+            AffinityTopologyVersion pendingTopVer = null;
+            AffinityTopologyVersion newTopVer = currTopVer;
+
+            if (!initTopVer.equals(newTopVer))
+                pendingTopVer = newTopVer;
+            else {
+                GridDhtTopologyFuture fut = 
ctx.cache().context().exchange().lastTopologyFuture();
+
+                if (!fut.isDone() && !fut.isCancelled()) {
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw U.convertException(e);
+                    }
+                }
+
+                AffinityTopologyVersion lastTopVer;
+
+                // If exchange already moved forward - skip current version.
+                if (fut.exchangeDone() && newTopVer.compareTo(lastTopVer = 
fut.topologyVersion()) < 0)
+                    pendingTopVer = lastTopVer;
+            }
+
+            if (pendingTopVer != null && log.isInfoEnabled()) {
+                log.info("Service processor detected a topology change during 
" +
+                    "assignments calculation (will abort current iteration and 
" +
+                    "re-calculate on the newer version): " +
+                    "[topVer=" + initTopVer + ", newTopVer=" + pendingTopVer + 
']');
+            }
+
+            return pendingTopVer != null;
+        }
+
         /** {@inheritDoc} */
         @Override public void onEvent(final DiscoveryEvent evt, final 
DiscoCache discoCache) {
             GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock;
@@ -1792,17 +1834,8 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
 
                                 while (it.hasNext()) {
                                     // If topology changed again, let next 
event handle it.
-                                    AffinityTopologyVersion currTopVer0 = 
currTopVer;
-
-                                    if (currTopVer0 != topVer) {
-                                        if (log.isInfoEnabled())
-                                            log.info("Service processor 
detected a topology change during " +
-                                                "assignments calculation (will 
abort current iteration and " +
-                                                "re-calculate on the newer 
version): " +
-                                                "[topVer=" + topVer + ", 
newTopVer=" + currTopVer0 + ']');
-
+                                    if (skipExchange(topVer))
                                         return;
-                                    }
 
                                     Cache.Entry<Object, Object> e = it.next();
 
@@ -1811,12 +1844,10 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
                                     try {
                                         
svcName.set(dep.configuration().getName());
 
-                                        
ctx.cache().context().exchange().affinityReadyFuture(topVer).get();
-
                                         reassign(dep, topVer);
                                     }
                                     catch (IgniteCheckedException ex) {
-                                        if (!(e instanceof 
ClusterTopologyCheckedException))
+                                        if (!(ex instanceof 
ClusterTopologyCheckedException))
                                             LT.error(log, ex, "Failed to do 
service reassignment (will retry): " +
                                                 dep.configuration().getName());
 
@@ -1838,6 +1869,10 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
                         // Clean up zombie assignments.
                         IgniteInternalCache<Object, Object> cache = 
serviceCache();
 
+                        // If topology changed again, let next event handle it.
+                        if (skipExchange(topVer))
+                            return;
+
                         while (it.hasNext()) {
                             Cache.Entry<Object, Object> e = it.next();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/efa32692/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java
index 0f5d595..e44c8ea 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java
@@ -29,11 +29,15 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
 
 /**
  * Tests service reassignment.
  */
 public class GridServiceReassignmentSelfTest extends 
GridServiceProcessorAbstractSelfTest {
+    /** */
+    private static final String SERVICE_NAME = "testService";
+
     /** {@inheritDoc} */
     @Override protected int nodeCount() {
         return 1;
@@ -71,7 +75,7 @@ public class GridServiceReassignmentSelfTest extends 
GridServiceProcessorAbstrac
      * @throws Exception If failed.
      */
     private CounterService proxy(Ignite g) throws Exception {
-        return g.services().serviceProxy("testService", CounterService.class, 
false);
+        return g.services().serviceProxy(SERVICE_NAME, CounterService.class, 
false);
     }
 
     /**
@@ -82,9 +86,9 @@ public class GridServiceReassignmentSelfTest extends 
GridServiceProcessorAbstrac
     private void checkReassigns(int total, int maxPerNode) throws Exception {
         CountDownLatch latch = new CountDownLatch(nodeCount());
 
-        DummyService.exeLatch("testService", latch);
+        DummyService.exeLatch(SERVICE_NAME, latch);
 
-        grid(0).services().deployMultiple("testService", new 
CounterServiceImpl(), total, maxPerNode);
+        grid(0).services().deployMultiple(SERVICE_NAME, new 
CounterServiceImpl(), total, maxPerNode);
 
         for (int i = 0; i < 10; i++)
             proxy(randomGrid()).increment();
@@ -104,11 +108,7 @@ public class GridServiceReassignmentSelfTest extends 
GridServiceProcessorAbstrac
                 if (grow) {
                     assert startedGrids.size() < maxTopSize;
 
-                    int gridIdx = nextAvailableIdx(startedGrids, maxTopSize, 
rnd);
-
-                    startGrid(gridIdx);
-
-                    startedGrids.add(gridIdx);
+                    startRandomNodesMultithreaded(maxTopSize, rnd, 
startedGrids);
 
                     if (startedGrids.size() == maxTopSize)
                         grow = false;
@@ -135,7 +135,7 @@ public class GridServiceReassignmentSelfTest extends 
GridServiceProcessorAbstrac
             }
         }
         finally {
-            grid(F.first(startedGrids)).services().cancel("testService");
+            grid(F.first(startedGrids)).services().cancel(SERVICE_NAME);
 
             stopAllGrids();
 
@@ -158,7 +158,7 @@ public class GridServiceReassignmentSelfTest extends 
GridServiceProcessorAbstrac
 
         IgniteInternalCache<GridServiceAssignmentsKey, GridServiceAssignments> 
cache = grid.utilityCache();
 
-        GridServiceAssignments assignments = cache.get(new 
GridServiceAssignmentsKey("testService"));
+        GridServiceAssignments assignments = cache.get(new 
GridServiceAssignmentsKey(SERVICE_NAME));
 
         Collection<UUID> nodes = F.viewReadOnly(grid.cluster().nodes(), 
F.node2id());
 
@@ -186,6 +186,8 @@ public class GridServiceReassignmentSelfTest extends 
GridServiceProcessorAbstrac
         if (total > 0)
             assertTrue("Total number of services limit exceeded [sum=" + sum +
                 ", assigns=" + assignments.assigns() + ']', sum <= total);
+        else
+            assertEquals("Reassign per node failed.", nodes.size(), 
assignments.assigns().size());
 
         if (!lastTry && proxy(grid).get() != 10)
             return false;
@@ -196,6 +198,29 @@ public class GridServiceReassignmentSelfTest extends 
GridServiceProcessorAbstrac
     }
 
     /**
+     * Start 1, 2 or 3 random nodes simultaneously.
+     *
+     * @param limit Cluster size limit.
+     * @param rnd Randmo generator.
+     * @param grids Collection with indexes of running nodes.
+     * @throws Exception If failed.
+     */
+    private void startRandomNodesMultithreaded(int limit, Random rnd, 
Collection<Integer> grids) throws Exception {
+        int cnt = rnd.nextInt(Math.min(limit - grids.size(), 3)) + 1;
+
+        for (int i = 1; i <= cnt; i++) {
+            int gridIdx = nextAvailableIdx(grids, limit, rnd);
+
+            if (i == cnt)
+                startGrid(gridIdx);
+            else
+                GridTestUtils.runAsync(() -> startGrid(gridIdx));
+
+            grids.add(gridIdx);
+        }
+    }
+
+    /**
      * Gets next available index.
      *
      * @param startedGrids Indexes for started grids.

http://git-wip-us.apache.org/repos/asf/ignite/blob/efa32692/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
index 8116d1b..865f121 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
@@ -17,10 +17,16 @@
 
 package org.apache.ignite.internal.processors.service;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.services.Service;
@@ -29,6 +35,7 @@ import org.apache.ignite.services.ServiceContext;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridStringLogger;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -42,6 +49,12 @@ public class IgniteServiceReassignmentTest extends 
GridCommonAbstractTest {
     /** */
     private ServiceConfiguration srvcCfg;
 
+    /** */
+    private boolean useStrLog;
+
+    /** */
+    private List<IgniteLogger> strLoggers = new ArrayList<>();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -51,6 +64,16 @@ public class IgniteServiceReassignmentTest extends 
GridCommonAbstractTest {
         if (srvcCfg != null)
             cfg.setServiceConfiguration(srvcCfg);
 
+        if (useStrLog) {
+            GridStringLogger strLog = new GridStringLogger(false, 
cfg.getGridLogger());
+
+            strLog.logLength(100 * 1024);
+
+            cfg.setGridLogger(strLog);
+
+            strLoggers.add(strLog);
+        }
+
         return cfg;
     }
 
@@ -165,6 +188,62 @@ public class IgniteServiceReassignmentTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testZombieAssignmentsCleanup() throws Exception {
+        useStrLog = true;
+
+        final int nodesCnt = 2;
+        final int maxSvc = 30;
+
+        try {
+            startGridsMultiThreaded(nodesCnt);
+
+            IgniteEx ignite = grid(0);
+
+            IgniteInternalCache<GridServiceAssignmentsKey, Object> sysCache = 
ignite.utilityCache();
+
+            List<GridServiceAssignmentsKey> zombieAssignmentsKeys = new 
ArrayList<>(maxSvc);
+
+            // Adding some assignments without deployments.
+            for (int i = 0; i < maxSvc; i++) {
+                String name = "svc-" + i;
+
+                ServiceConfiguration svcCfg = new ServiceConfiguration();
+
+                svcCfg.setName(name);
+
+                GridServiceAssignmentsKey key = new 
GridServiceAssignmentsKey(name);
+
+                UUID nodeId = grid(i % nodesCnt).localNode().id();
+
+                sysCache.put(key, new GridServiceAssignments(svcCfg, nodeId, 
ignite.cluster().topologyVersion()));
+
+                zombieAssignmentsKeys.add(key);
+            }
+
+            // Simulate exchange with merge.
+            GridTestUtils.runAsync(() -> startGrid(nodesCnt));
+            GridTestUtils.runAsync(() -> startGrid(nodesCnt + 1));
+            startGrid(nodesCnt + 2);
+
+            awaitPartitionMapExchange();
+
+            // Checking that all our assignments was removed.
+            for (GridServiceAssignmentsKey key : zombieAssignmentsKeys)
+                assertNull("Found assignment for undeployed service " + 
key.name(), sysCache.get(key));
+
+            for (IgniteLogger logger : strLoggers)
+                assertFalse(logger.toString().contains("Getting affinity for 
topology version earlier than affinity is " +
+                    "calculated"));
+        } finally {
+            useStrLog = false;
+
+            strLoggers.clear();
+        }
+    }
+
+    /**
      * @param node Node.
      * @throws Exception If failed.
      */

Reply via email to