Repository: ignite Updated Branches: refs/heads/master a303010e0 -> 523900a0c
IGNITE-8746 EVT_CACHE_REBALANCE_PART_DATA_LOST event received twice on the coordinator node - Fixes #4242. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/523900a0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/523900a0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/523900a0 Branch: refs/heads/master Commit: 523900a0c56ae1ce2f9ff56368c0159983645af2 Parents: a303010 Author: pvinokurov <vinokurov.pa...@gmail.com> Authored: Mon Jul 2 18:33:55 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Jul 2 18:33:55 2018 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 22 +- .../IgniteCachePartitionLossPolicySelfTest.java | 246 ++++++++++++++----- 2 files changed, 205 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/523900a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index ac338ae..89cd4c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -107,6 +107,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Node to partition map. */ private GridDhtPartitionFullMap node2part; + /** Partitions map for left nodes. */ + private GridDhtPartitionFullMap leftNode2Part = new GridDhtPartitionFullMap(); + /** */ private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>(); @@ -1995,6 +1998,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert plc != null; + Set<Integer> recentlyLost = new HashSet<>(); + + for (Map.Entry<UUID, GridDhtPartitionMap> leftEntry : leftNode2Part.entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionState> entry : leftEntry.getValue().entrySet()) { + if (entry.getValue() == OWNING) + recentlyLost.add(entry.getKey()); + } + } + // Update partition state on all nodes. for (Integer part : lost) { long updSeq = updateSeq.incrementAndGet(); @@ -2002,6 +2014,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtLocalPartition locPart = localPartition(part, resTopVer, false, true); if (locPart != null) { + if (locPart.state() == LOST) + continue; + boolean marked = plc == PartitionLossPolicy.IGNORE ? locPart.own() : locPart.markLost(); if (marked) @@ -2020,7 +2035,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + if (recentlyLost.contains(part) && grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { grp.addRebalanceEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), @@ -2033,6 +2048,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { grp.needsRecovery(true); } + leftNode2Part.clear(); + return changed; } finally { @@ -2452,6 +2469,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionMap parts = node2part.remove(nodeId); + if (parts != null) + leftNode2Part.put(nodeId, parts); + if (!grp.isReplicated()) { if (parts != null) { for (Integer p : parts.keySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/523900a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java index 7f35ddb..f4660fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java @@ -17,34 +17,41 @@ package org.apache.ignite.internal.processors.cache.distributed; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.TestDelayingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import javax.cache.CacheException; -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -64,18 +71,37 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** */ private static final String CACHE_NAME = "partitioned"; + /** */ + private int backups = 0; + + /** */ + private final AtomicBoolean delayPartExchange = new AtomicBoolean(false); + + /** */ + private final TopologyChanger killSingleNode = new TopologyChanger(false, Arrays.asList(3), Arrays.asList(0, 1, 2, 4)); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + cfg.setCommunicationSpi(new TestDelayingCommunicationSpi() { + @Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) { + return delayPartExchange.get() && (msg instanceof GridDhtPartitionsFullMessage || msg instanceof GridDhtPartitionsAbstractMessage); + } + + @Override protected int delayMillis() { + return 500; + } + }); + cfg.setClientMode(client); CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(CACHE_NAME); cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setBackups(0); + cacheCfg.setBackups(backups); cacheCfg.setWriteSynchronizationMode(FULL_SYNC); cacheCfg.setPartitionLossPolicy(partLossPlc); cacheCfg.setAffinity(new RendezvousAffinityFunction(false, 32)); @@ -90,13 +116,22 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe stopAllGrids(); } + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + delayPartExchange.set(false); + + backups = 0; + } + /** * @throws Exception if failed. */ public void testReadOnlySafe() throws Exception { partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE; - checkLostPartition(false, true); + checkLostPartition(false, true, killSingleNode); } /** @@ -105,7 +140,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe public void testReadOnlyAll() throws Exception { partLossPlc = PartitionLossPolicy.READ_ONLY_ALL; - checkLostPartition(false, false); + checkLostPartition(false, false, killSingleNode); } /** @@ -114,7 +149,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe public void testReadWriteSafe() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - checkLostPartition(true, true); + checkLostPartition(true, true, killSingleNode); } /** @@ -123,16 +158,57 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe public void testReadWriteAll() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_ALL; - checkLostPartition(true, false); + checkLostPartition(true, false, killSingleNode); } /** * @throws Exception if failed. */ - public void testIgnore() throws Exception { + public void testReadWriteSafeAfterKillTwoNodes() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4))); + } + + /** + * @throws Exception if failed. + */ + public void testReadWriteSafeAfterKillCrd() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4))); + } + + /** + * @throws Exception if failed. + */ + public void testReadWriteSafeWithBackups() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + backups = 1; + + checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2), Arrays.asList(0, 1, 4))); + } + + /** + * @throws Exception if failed. + */ + public void testReadWriteSafeWithBackupsAfterKillCrd() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + backups = 1; + + checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4))); + } + + /** + * @param topChanger topology changer. + * @throws Exception if failed. + */ + public void testIgnore(TopologyChanger topChanger) throws Exception { fail("https://issues.apache.org/jira/browse/IGNITE-5078"); - prepareTopology(); + topChanger.changeTopology(); for (Ignite ig : G.allGrids()) { IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME); @@ -154,12 +230,13 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @param canWrite {@code True} if writes are allowed. * @param safe {@code True} if lost partition should trigger exception. + * @param topChanger topology changer. * @throws Exception if failed. */ - private void checkLostPartition(boolean canWrite, boolean safe) throws Exception { + private void checkLostPartition(boolean canWrite, boolean safe, TopologyChanger topChanger) throws Exception { assert partLossPlc != null; - int part = prepareTopology(); + int part = topChanger.changeTopology(); // Wait for all grids (servers and client) have same topology version // to make sure that all nodes received map with lost partition. @@ -204,7 +281,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe for (Ignite ig : G.allGrids()) verifyCacheOps(canWrite, safe, part, ig); - ignite(0).resetLostPartitions(Collections.singletonList(CACHE_NAME)); + ignite(4).resetLostPartitions(Collections.singletonList(CACHE_NAME)); awaitPartitionMapExchange(true, true, null); @@ -282,75 +359,120 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe } /** - * @return Lost partition ID. - * @throws Exception If failed. + * @param nodes List of nodes to find partition. + * @return Partition id that isn't primary or backup for specified nodes. */ - private int prepareTopology() throws Exception { - startGrids(4); + protected Integer noPrimaryOrBackupPartition(List<Integer> nodes) { + Affinity<Object> aff = ignite(4).affinity(CACHE_NAME); + + Integer part; + + for (int i = 0; i < aff.partitions(); i++) { + part = i; + + for (Integer id : nodes) { + if (aff.isPrimaryOrBackup(grid(id).cluster().localNode(), i)) { + part = null; - Affinity<Object> aff = ignite(0).affinity(CACHE_NAME); + break; + } + } - for (int i = 0; i < aff.partitions(); i++) - ignite(0).cache(CACHE_NAME).put(i, i); + if (part != null) + return part; - client = true; + } + + return null; + } - startGrid(4); + /** */ + class TopologyChanger { + /** Flag to delay partition exchange */ + private boolean delayExchange; + + /** List of nodes to kill */ + private List<Integer> killNodes; + + /** List of nodes to be alive */ + private List<Integer> aliveNodes; + + /** + * @param delayExchange Flag for delay partition exchange. + * @param killNodes List of nodes to kill. + * @param aliveNodes List of nodes to be alive. + */ + public TopologyChanger(boolean delayExchange, List<Integer> killNodes, List<Integer> aliveNodes) { + this.delayExchange = delayExchange; + this.killNodes = killNodes; + this.aliveNodes = aliveNodes; + } - client = false; + /** + * @return Lost partition ID. + * @throws Exception If failed. + */ + protected int changeTopology() throws Exception { + startGrids(4); - for (int i = 0; i < 5; i++) - info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']'); + Affinity<Object> aff = ignite(0).affinity(CACHE_NAME); - awaitPartitionMapExchange(); + for (int i = 0; i < aff.partitions(); i++) + ignite(0).cache(CACHE_NAME).put(i, i); - ClusterNode killNode = ignite(3).cluster().localNode(); + client = true; - int part = -1; + startGrid(4); - for (int i = 0; i < aff.partitions(); i++) { - if (aff.isPrimary(killNode, i)) { - part = i; + client = false; - break; - } - } + for (int i = 0; i < 5; i++) + info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']'); - if (part == -1) - throw new IllegalStateException("No partition on node: " + killNode); + awaitPartitionMapExchange(); - final CountDownLatch[] partLost = new CountDownLatch[3]; + final Integer part = noPrimaryOrBackupPartition(aliveNodes); - // Check events. - for (int i = 0; i < 3; i++) { - final CountDownLatch latch = new CountDownLatch(1); - partLost[i] = latch; + if (part == null) + throw new IllegalStateException("No partition on nodes: " + killNodes); - final int part0 = part; + final List<Semaphore> partLost = new ArrayList<>(); - grid(i).events().localListen(new P1<Event>() { - @Override public boolean apply(Event evt) { - assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; + for (int i : aliveNodes) { + final Semaphore sem = new Semaphore(0); + partLost.add(sem); - CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt; + grid(i).events().localListen(new P1<Event>() { + @Override public boolean apply(Event evt) { + assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; - if (cacheEvt.partition() == part0 && F.eq(CACHE_NAME, cacheEvt.cacheName())) { - latch.countDown(); + CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt; - // Auto-unsubscribe. - return false; + if (cacheEvt.partition() == part && F.eq(CACHE_NAME, cacheEvt.cacheName())) + sem.release(); + + return true; } + }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); - return true; - } - }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); - } + } + + if (delayExchange) + delayPartExchange.set(true); - ignite(3).close(); + for (Integer node : killNodes) + grid(node).close(); - for (CountDownLatch latch : partLost) - assertTrue("Failed to wait for partition LOST event", latch.await(10, TimeUnit.SECONDS)); + delayPartExchange.set(false); - return part; + for (Semaphore sem : partLost) + assertTrue("Failed to wait for partition LOST event", sem.tryAcquire(1, 10L, TimeUnit.SECONDS)); + + for (Semaphore sem : partLost) + assertFalse("Partition LOST event raised twice", sem.tryAcquire(1, 1L, TimeUnit.SECONDS)); + + return part; + } } + }