Repository: ignite
Updated Branches:
  refs/heads/master a303010e0 -> 523900a0c


IGNITE-8746 EVT_CACHE_REBALANCE_PART_DATA_LOST event received twice on the 
coordinator node - Fixes #4242.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/523900a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/523900a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/523900a0

Branch: refs/heads/master
Commit: 523900a0c56ae1ce2f9ff56368c0159983645af2
Parents: a303010
Author: pvinokurov <vinokurov.pa...@gmail.com>
Authored: Mon Jul 2 18:33:55 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Mon Jul 2 18:33:55 2018 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       |  22 +-
 .../IgniteCachePartitionLossPolicySelfTest.java | 246 ++++++++++++++-----
 2 files changed, 205 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/523900a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index ac338ae..89cd4c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -107,6 +107,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     /** Node to partition map. */
     private GridDhtPartitionFullMap node2part;
 
+    /** Partitions map for left nodes. */
+    private GridDhtPartitionFullMap leftNode2Part = new 
GridDhtPartitionFullMap();
+
     /** */
     private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>();
 
@@ -1995,6 +1998,15 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                     assert plc != null;
 
+                    Set<Integer> recentlyLost = new HashSet<>();
+
+                    for (Map.Entry<UUID, GridDhtPartitionMap> leftEntry : 
leftNode2Part.entrySet()) {
+                        for (Map.Entry<Integer, GridDhtPartitionState> entry : 
leftEntry.getValue().entrySet()) {
+                            if (entry.getValue() == OWNING)
+                                recentlyLost.add(entry.getKey());
+                        }
+                    }
+
                     // Update partition state on all nodes.
                     for (Integer part : lost) {
                         long updSeq = updateSeq.incrementAndGet();
@@ -2002,6 +2014,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         GridDhtLocalPartition locPart = localPartition(part, 
resTopVer, false, true);
 
                         if (locPart != null) {
+                            if (locPart.state() == LOST)
+                                continue;
+
                             boolean marked = plc == PartitionLossPolicy.IGNORE 
? locPart.own() : locPart.markLost();
 
                             if (marked)
@@ -2020,7 +2035,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                             }
                         }
 
-                        if 
(grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        if (recentlyLost.contains(part) && 
grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
                             grp.addRebalanceEvent(part,
                                 EVT_CACHE_REBALANCE_PART_DATA_LOST,
                                 discoEvt.eventNode(),
@@ -2033,6 +2048,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         grp.needsRecovery(true);
                 }
 
+                leftNode2Part.clear();
+
                 return changed;
             }
             finally {
@@ -2452,6 +2469,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             GridDhtPartitionMap parts = node2part.remove(nodeId);
 
+            if (parts != null)
+                leftNode2Part.put(nodeId, parts);
+
             if (!grp.isReplicated()) {
                 if (parts != null) {
                     for (Integer p : parts.keySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/523900a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index 7f35ddb..f4660fa 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -17,34 +17,41 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestDelayingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
@@ -64,18 +71,37 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
     /** */
     private static final String CACHE_NAME = "partitioned";
 
+    /** */
+    private int backups = 0;
+
+    /** */
+    private final AtomicBoolean delayPartExchange = new AtomicBoolean(false);
+
+    /** */
+    private final TopologyChanger killSingleNode = new TopologyChanger(false, 
Arrays.asList(3), Arrays.asList(0, 1, 2, 4));
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
+        cfg.setCommunicationSpi(new TestDelayingCommunicationSpi() {
+            @Override protected boolean delayMessage(Message msg, 
GridIoMessage ioMsg) {
+                return delayPartExchange.get() && (msg instanceof 
GridDhtPartitionsFullMessage || msg instanceof 
GridDhtPartitionsAbstractMessage);
+            }
+
+            @Override protected int delayMillis() {
+                return 500;
+            }
+        });
+
         cfg.setClientMode(client);
 
         CacheConfiguration<Integer, Integer> cacheCfg = new 
CacheConfiguration<>(CACHE_NAME);
 
         cacheCfg.setCacheMode(PARTITIONED);
-        cacheCfg.setBackups(0);
+        cacheCfg.setBackups(backups);
         cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
         cacheCfg.setPartitionLossPolicy(partLossPlc);
         cacheCfg.setAffinity(new RendezvousAffinityFunction(false, 32));
@@ -90,13 +116,22 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
         stopAllGrids();
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        delayPartExchange.set(false);
+
+        backups = 0;
+    }
+
     /**
      * @throws Exception if failed.
      */
     public void testReadOnlySafe() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE;
 
-        checkLostPartition(false, true);
+        checkLostPartition(false, true, killSingleNode);
     }
 
     /**
@@ -105,7 +140,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
     public void testReadOnlyAll() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_ONLY_ALL;
 
-        checkLostPartition(false, false);
+        checkLostPartition(false, false, killSingleNode);
     }
 
     /**
@@ -114,7 +149,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
     public void testReadWriteSafe() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
-        checkLostPartition(true, true);
+        checkLostPartition(true, true, killSingleNode);
     }
 
     /**
@@ -123,16 +158,57 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     public void testReadWriteAll() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
 
-        checkLostPartition(true, false);
+        checkLostPartition(true, false, killSingleNode);
     }
 
     /**
      * @throws Exception if failed.
      */
-    public void testIgnore() throws Exception {
+    public void testReadWriteSafeAfterKillTwoNodes() throws Exception {
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        checkLostPartition(true, true, new TopologyChanger(false, 
Arrays.asList(3, 2), Arrays.asList(0, 1, 4)));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testReadWriteSafeAfterKillCrd() throws Exception {
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        checkLostPartition(true, true, new TopologyChanger(true, 
Arrays.asList(3, 0), Arrays.asList(1, 2, 4)));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testReadWriteSafeWithBackups() throws Exception {
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        backups = 1;
+
+        checkLostPartition(true, true, new TopologyChanger(true, 
Arrays.asList(3, 2), Arrays.asList(0, 1, 4)));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testReadWriteSafeWithBackupsAfterKillCrd() throws Exception {
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        backups = 1;
+
+        checkLostPartition(true, true, new TopologyChanger(true, 
Arrays.asList(3, 0), Arrays.asList(1, 2, 4)));
+    }
+
+    /**
+     * @param topChanger topology changer.
+     * @throws Exception if failed.
+     */
+    public void testIgnore(TopologyChanger topChanger) throws Exception {
         fail("https://issues.apache.org/jira/browse/IGNITE-5078";);
 
-        prepareTopology();
+        topChanger.changeTopology();
 
         for (Ignite ig : G.allGrids()) {
             IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
@@ -154,12 +230,13 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @param canWrite {@code True} if writes are allowed.
      * @param safe {@code True} if lost partition should trigger exception.
+     * @param topChanger topology changer.
      * @throws Exception if failed.
      */
-    private void checkLostPartition(boolean canWrite, boolean safe) throws 
Exception {
+    private void checkLostPartition(boolean canWrite, boolean safe, 
TopologyChanger topChanger) throws Exception {
         assert partLossPlc != null;
 
-        int part = prepareTopology();
+        int part = topChanger.changeTopology();
 
         // Wait for all grids (servers and client) have same topology version
         // to make sure that all nodes received map with lost partition.
@@ -204,7 +281,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
         for (Ignite ig : G.allGrids())
             verifyCacheOps(canWrite, safe, part, ig);
 
-        ignite(0).resetLostPartitions(Collections.singletonList(CACHE_NAME));
+        ignite(4).resetLostPartitions(Collections.singletonList(CACHE_NAME));
 
         awaitPartitionMapExchange(true, true, null);
 
@@ -282,75 +359,120 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     }
 
     /**
-     * @return Lost partition ID.
-     * @throws Exception If failed.
+     * @param nodes List of nodes to find partition.
+     * @return Partition id that isn't primary or backup for specified nodes.
      */
-    private int prepareTopology() throws Exception {
-        startGrids(4);
+    protected Integer noPrimaryOrBackupPartition(List<Integer> nodes) {
+        Affinity<Object> aff = ignite(4).affinity(CACHE_NAME);
+
+        Integer part;
+
+        for (int i = 0; i < aff.partitions(); i++) {
+            part = i;
+
+            for (Integer id : nodes) {
+                if (aff.isPrimaryOrBackup(grid(id).cluster().localNode(), i)) {
+                    part = null;
 
-        Affinity<Object> aff = ignite(0).affinity(CACHE_NAME);
+                    break;
+                }
+            }
 
-        for (int i = 0; i < aff.partitions(); i++)
-            ignite(0).cache(CACHE_NAME).put(i, i);
+            if (part != null)
+                return part;
 
-        client = true;
+        }
+
+        return null;
+    }
 
-        startGrid(4);
+    /** */
+    class TopologyChanger {
+        /** Flag to delay partition exchange */
+        private boolean delayExchange;
+
+        /** List of nodes to kill */
+        private List<Integer> killNodes;
+
+        /** List of nodes to be alive */
+        private List<Integer> aliveNodes;
+
+        /**
+         * @param delayExchange Flag for delay partition exchange.
+         * @param killNodes List of nodes to kill.
+         * @param aliveNodes List of nodes to be alive.
+         */
+        public TopologyChanger(boolean delayExchange, List<Integer> killNodes, 
List<Integer> aliveNodes) {
+            this.delayExchange = delayExchange;
+            this.killNodes = killNodes;
+            this.aliveNodes = aliveNodes;
+        }
 
-        client = false;
+        /**
+         * @return Lost partition ID.
+         * @throws Exception If failed.
+         */
+        protected int changeTopology() throws Exception {
+            startGrids(4);
 
-        for (int i = 0; i < 5; i++)
-            info(">>> Node [idx=" + i + ", nodeId=" + 
ignite(i).cluster().localNode().id() + ']');
+            Affinity<Object> aff = ignite(0).affinity(CACHE_NAME);
 
-        awaitPartitionMapExchange();
+            for (int i = 0; i < aff.partitions(); i++)
+                ignite(0).cache(CACHE_NAME).put(i, i);
 
-        ClusterNode killNode = ignite(3).cluster().localNode();
+            client = true;
 
-        int part = -1;
+            startGrid(4);
 
-        for (int i = 0; i < aff.partitions(); i++) {
-            if (aff.isPrimary(killNode, i)) {
-                part = i;
+            client = false;
 
-                break;
-            }
-        }
+            for (int i = 0; i < 5; i++)
+                info(">>> Node [idx=" + i + ", nodeId=" + 
ignite(i).cluster().localNode().id() + ']');
 
-        if (part == -1)
-            throw new IllegalStateException("No partition on node: " + 
killNode);
+            awaitPartitionMapExchange();
 
-        final CountDownLatch[] partLost = new CountDownLatch[3];
+            final Integer part = noPrimaryOrBackupPartition(aliveNodes);
 
-        // Check events.
-        for (int i = 0; i < 3; i++) {
-            final CountDownLatch latch = new CountDownLatch(1);
-            partLost[i] = latch;
+            if (part == null)
+                throw new IllegalStateException("No partition on nodes: " + 
killNodes);
 
-            final int part0 = part;
+            final List<Semaphore> partLost = new ArrayList<>();
 
-            grid(i).events().localListen(new P1<Event>() {
-                @Override public boolean apply(Event evt) {
-                    assert evt.type() == 
EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+            for (int i : aliveNodes) {
+                final Semaphore sem = new Semaphore(0);
+                partLost.add(sem);
 
-                    CacheRebalancingEvent cacheEvt = 
(CacheRebalancingEvent)evt;
+                grid(i).events().localListen(new P1<Event>() {
+                    @Override public boolean apply(Event evt) {
+                        assert evt.type() == 
EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 
-                    if (cacheEvt.partition() == part0 && F.eq(CACHE_NAME, 
cacheEvt.cacheName())) {
-                        latch.countDown();
+                        CacheRebalancingEvent cacheEvt = 
(CacheRebalancingEvent)evt;
 
-                        // Auto-unsubscribe.
-                        return false;
+                        if (cacheEvt.partition() == part && F.eq(CACHE_NAME, 
cacheEvt.cacheName()))
+                            sem.release();
+
+                        return true;
                     }
+                }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
 
-                    return true;
-                }
-            }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
-        }
+            }
+
+            if (delayExchange)
+                delayPartExchange.set(true);
 
-        ignite(3).close();
+            for (Integer node : killNodes)
+                grid(node).close();
 
-        for (CountDownLatch latch : partLost)
-            assertTrue("Failed to wait for partition LOST event", 
latch.await(10, TimeUnit.SECONDS));
+            delayPartExchange.set(false);
 
-        return part;
+            for (Semaphore sem : partLost)
+                assertTrue("Failed to wait for partition LOST event", 
sem.tryAcquire(1, 10L, TimeUnit.SECONDS));
+
+            for (Semaphore sem : partLost)
+                assertFalse("Partition LOST event raised twice", 
sem.tryAcquire(1, 1L, TimeUnit.SECONDS));
+
+            return part;
+        }
     }
+
 }

Reply via email to