IGNITE-7366 Affinity assignment exception in service processor during multiple nodes join - Fixes #4321.
Signed-off-by: Ivan Rakov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/efa32692 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/efa32692 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/efa32692 Branch: refs/heads/ignite-8446 Commit: efa3269280f6c54b93e13af2fa56c7d27b46208a Parents: 08f98e3 Author: Pereslegin Pavel <[email protected]> Authored: Wed Jul 11 17:25:34 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Wed Jul 11 17:25:34 2018 +0300 ---------------------------------------------------------------------- .../service/GridServiceProcessor.java | 61 +++++++++++---- .../GridServiceReassignmentSelfTest.java | 45 ++++++++--- .../service/IgniteServiceReassignmentTest.java | 79 ++++++++++++++++++++ 3 files changed, 162 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/efa32692/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 0e4a318..f8c4b73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; @@ -1739,6 +1740,47 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite /** */ private volatile AffinityTopologyVersion currTopVer = null; + /** + * Check that listening-in topology version is the latest and wait until exchange is finished. + * + * @param initTopVer listening-in topology version. + * @return {@code True} if current event is not last and should be skipped. + */ + private boolean skipExchange(AffinityTopologyVersion initTopVer) { + AffinityTopologyVersion pendingTopVer = null; + AffinityTopologyVersion newTopVer = currTopVer; + + if (!initTopVer.equals(newTopVer)) + pendingTopVer = newTopVer; + else { + GridDhtTopologyFuture fut = ctx.cache().context().exchange().lastTopologyFuture(); + + if (!fut.isDone() && !fut.isCancelled()) { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + AffinityTopologyVersion lastTopVer; + + // If exchange already moved forward - skip current version. + if (fut.exchangeDone() && newTopVer.compareTo(lastTopVer = fut.topologyVersion()) < 0) + pendingTopVer = lastTopVer; + } + + if (pendingTopVer != null && log.isInfoEnabled()) { + log.info("Service processor detected a topology change during " + + "assignments calculation (will abort current iteration and " + + "re-calculate on the newer version): " + + "[topVer=" + initTopVer + ", newTopVer=" + pendingTopVer + ']'); + } + + return pendingTopVer != null; + } + /** {@inheritDoc} */ @Override public void onEvent(final DiscoveryEvent evt, final DiscoCache discoCache) { GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock; @@ -1792,17 +1834,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite while (it.hasNext()) { // If topology changed again, let next event handle it. - AffinityTopologyVersion currTopVer0 = currTopVer; - - if (currTopVer0 != topVer) { - if (log.isInfoEnabled()) - log.info("Service processor detected a topology change during " + - "assignments calculation (will abort current iteration and " + - "re-calculate on the newer version): " + - "[topVer=" + topVer + ", newTopVer=" + currTopVer0 + ']'); - + if (skipExchange(topVer)) return; - } Cache.Entry<Object, Object> e = it.next(); @@ -1811,12 +1844,10 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite try { svcName.set(dep.configuration().getName()); - ctx.cache().context().exchange().affinityReadyFuture(topVer).get(); - reassign(dep, topVer); } catch (IgniteCheckedException ex) { - if (!(e instanceof ClusterTopologyCheckedException)) + if (!(ex instanceof ClusterTopologyCheckedException)) LT.error(log, ex, "Failed to do service reassignment (will retry): " + dep.configuration().getName()); @@ -1838,6 +1869,10 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite // Clean up zombie assignments. IgniteInternalCache<Object, Object> cache = serviceCache(); + // If topology changed again, let next event handle it. + if (skipExchange(topVer)) + return; + while (it.hasNext()) { Cache.Entry<Object, Object> e = it.next(); http://git-wip-us.apache.org/repos/asf/ignite/blob/efa32692/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java index 0f5d595..e44c8ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java @@ -29,11 +29,15 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; /** * Tests service reassignment. */ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstractSelfTest { + /** */ + private static final String SERVICE_NAME = "testService"; + /** {@inheritDoc} */ @Override protected int nodeCount() { return 1; @@ -71,7 +75,7 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac * @throws Exception If failed. */ private CounterService proxy(Ignite g) throws Exception { - return g.services().serviceProxy("testService", CounterService.class, false); + return g.services().serviceProxy(SERVICE_NAME, CounterService.class, false); } /** @@ -82,9 +86,9 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac private void checkReassigns(int total, int maxPerNode) throws Exception { CountDownLatch latch = new CountDownLatch(nodeCount()); - DummyService.exeLatch("testService", latch); + DummyService.exeLatch(SERVICE_NAME, latch); - grid(0).services().deployMultiple("testService", new CounterServiceImpl(), total, maxPerNode); + grid(0).services().deployMultiple(SERVICE_NAME, new CounterServiceImpl(), total, maxPerNode); for (int i = 0; i < 10; i++) proxy(randomGrid()).increment(); @@ -104,11 +108,7 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac if (grow) { assert startedGrids.size() < maxTopSize; - int gridIdx = nextAvailableIdx(startedGrids, maxTopSize, rnd); - - startGrid(gridIdx); - - startedGrids.add(gridIdx); + startRandomNodesMultithreaded(maxTopSize, rnd, startedGrids); if (startedGrids.size() == maxTopSize) grow = false; @@ -135,7 +135,7 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac } } finally { - grid(F.first(startedGrids)).services().cancel("testService"); + grid(F.first(startedGrids)).services().cancel(SERVICE_NAME); stopAllGrids(); @@ -158,7 +158,7 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac IgniteInternalCache<GridServiceAssignmentsKey, GridServiceAssignments> cache = grid.utilityCache(); - GridServiceAssignments assignments = cache.get(new GridServiceAssignmentsKey("testService")); + GridServiceAssignments assignments = cache.get(new GridServiceAssignmentsKey(SERVICE_NAME)); Collection<UUID> nodes = F.viewReadOnly(grid.cluster().nodes(), F.node2id()); @@ -186,6 +186,8 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac if (total > 0) assertTrue("Total number of services limit exceeded [sum=" + sum + ", assigns=" + assignments.assigns() + ']', sum <= total); + else + assertEquals("Reassign per node failed.", nodes.size(), assignments.assigns().size()); if (!lastTry && proxy(grid).get() != 10) return false; @@ -196,6 +198,29 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac } /** + * Start 1, 2 or 3 random nodes simultaneously. + * + * @param limit Cluster size limit. + * @param rnd Randmo generator. + * @param grids Collection with indexes of running nodes. + * @throws Exception If failed. + */ + private void startRandomNodesMultithreaded(int limit, Random rnd, Collection<Integer> grids) throws Exception { + int cnt = rnd.nextInt(Math.min(limit - grids.size(), 3)) + 1; + + for (int i = 1; i <= cnt; i++) { + int gridIdx = nextAvailableIdx(grids, limit, rnd); + + if (i == cnt) + startGrid(gridIdx); + else + GridTestUtils.runAsync(() -> startGrid(gridIdx)); + + grids.add(gridIdx); + } + } + + /** * Gets next available index. * * @param startedGrids Indexes for started grids. http://git-wip-us.apache.org/repos/asf/ignite/blob/efa32692/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java index 8116d1b..865f121 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java @@ -17,10 +17,16 @@ package org.apache.ignite.internal.processors.service; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.services.Service; @@ -29,6 +35,7 @@ import org.apache.ignite.services.ServiceContext; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridStringLogger; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -42,6 +49,12 @@ public class IgniteServiceReassignmentTest extends GridCommonAbstractTest { /** */ private ServiceConfiguration srvcCfg; + /** */ + private boolean useStrLog; + + /** */ + private List<IgniteLogger> strLoggers = new ArrayList<>(); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -51,6 +64,16 @@ public class IgniteServiceReassignmentTest extends GridCommonAbstractTest { if (srvcCfg != null) cfg.setServiceConfiguration(srvcCfg); + if (useStrLog) { + GridStringLogger strLog = new GridStringLogger(false, cfg.getGridLogger()); + + strLog.logLength(100 * 1024); + + cfg.setGridLogger(strLog); + + strLoggers.add(strLog); + } + return cfg; } @@ -165,6 +188,62 @@ public class IgniteServiceReassignmentTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testZombieAssignmentsCleanup() throws Exception { + useStrLog = true; + + final int nodesCnt = 2; + final int maxSvc = 30; + + try { + startGridsMultiThreaded(nodesCnt); + + IgniteEx ignite = grid(0); + + IgniteInternalCache<GridServiceAssignmentsKey, Object> sysCache = ignite.utilityCache(); + + List<GridServiceAssignmentsKey> zombieAssignmentsKeys = new ArrayList<>(maxSvc); + + // Adding some assignments without deployments. + for (int i = 0; i < maxSvc; i++) { + String name = "svc-" + i; + + ServiceConfiguration svcCfg = new ServiceConfiguration(); + + svcCfg.setName(name); + + GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name); + + UUID nodeId = grid(i % nodesCnt).localNode().id(); + + sysCache.put(key, new GridServiceAssignments(svcCfg, nodeId, ignite.cluster().topologyVersion())); + + zombieAssignmentsKeys.add(key); + } + + // Simulate exchange with merge. + GridTestUtils.runAsync(() -> startGrid(nodesCnt)); + GridTestUtils.runAsync(() -> startGrid(nodesCnt + 1)); + startGrid(nodesCnt + 2); + + awaitPartitionMapExchange(); + + // Checking that all our assignments was removed. + for (GridServiceAssignmentsKey key : zombieAssignmentsKeys) + assertNull("Found assignment for undeployed service " + key.name(), sysCache.get(key)); + + for (IgniteLogger logger : strLoggers) + assertFalse(logger.toString().contains("Getting affinity for topology version earlier than affinity is " + + "calculated")); + } finally { + useStrLog = false; + + strLoggers.clear(); + } + } + + /** * @param node Node. * @throws Exception If failed. */
