This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 223281b  IGNITE-12546 Prevent partitions owned by other nodes switch 
their state to MOVING due to counter difference on node join. - Fixes #7273.
223281b is described below

commit 223281be93f61705530d8e88ac7eee3fe39453a9
Author: Aleksei Scherbakov <[email protected]>
AuthorDate: Mon Jan 20 14:41:06 2020 +0300

    IGNITE-12546 Prevent partitions owned by other nodes switch their state to 
MOVING due to counter difference on node join. - Fixes #7273.
    
    Signed-off-by: Aleksei Scherbakov <[email protected]>
---
 .../dht/topology/GridClientPartitionTopology.java  |  20 +--
 .../dht/topology/GridDhtPartitionTopology.java     |   8 +-
 .../dht/topology/GridDhtPartitionTopologyImpl.java |  40 +++--
 ...IgnitePdsSpuriousRebalancingOnNodeJoinTest.java | 170 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite4.java     |   2 +
 5 files changed, 217 insertions(+), 23 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index c9c090b..7a988a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1163,10 +1163,12 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, 
Set<UUID>> ownersByUpdCounters,
-        Set<Integer> haveHistory,
-        GridDhtPartitionsExchangeFuture exchFut) {
-        Map<UUID, Set<Integer>> result = new HashMap<>();
+    @Override public Map<UUID, Set<Integer>> resetOwners(
+        Map<Integer, Set<UUID>> ownersByUpdCounters,
+        Set<Integer> haveHist,
+        GridDhtPartitionsExchangeFuture exchFut
+    ) {
+        Map<UUID, Set<Integer>> res = new HashMap<>();
 
         lock.writeLock().lock();
 
@@ -1190,19 +1192,19 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
 
                         partMap.updateSequence(partMap.updateSequence() + 1, 
partMap.topologyVersion());
 
-                        result.computeIfAbsent(remoteNodeId, n -> new 
HashSet<>());
-                        result.get(remoteNodeId).add(part);
+                        res.computeIfAbsent(remoteNodeId, n -> new 
HashSet<>());
+                        res.get(remoteNodeId).add(part);
                     }
                 }
             }
 
-            for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) {
+            for (Map.Entry<UUID, Set<Integer>> entry : res.entrySet()) {
                 UUID nodeId = entry.getKey();
                 Set<Integer> partsToRebalance = entry.getValue();
 
                 if (!partsToRebalance.isEmpty()) {
                     Set<Integer> historical = partsToRebalance.stream()
-                        .filter(haveHistory::contains)
+                        .filter(haveHist::contains)
                         .collect(Collectors.toSet());
 
                     // Filter out partitions having WAL history.
@@ -1224,7 +1226,7 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
             lock.writeLock().unlock();
         }
 
-        return result;
+        return res;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 2a66224..1097294 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -424,17 +424,19 @@ public interface GridDhtPartitionTopology {
     public boolean rebalanceFinished(AffinityTopologyVersion topVer);
 
     /**
-     * Calculates nodes and partitions which have non-actual state and must be 
rebalanced.
+     * Calculates nodes and partitions which have non-actual state (based on 
LWM value) and must be rebalanced.
      * State of all current owners that aren't contained in the given {@code 
ownersByUpdCounters} will be reset to MOVING.
      * Called on coordinator during assignment of partition states.
      *
      * @param ownersByUpdCounters Map (partition, set of node IDs that have 
most actual state about partition
      *                            (update counter is maximal) and should hold 
OWNING state for such partition).
-     * @param haveHistory Set of partitions which have WAL history to 
rebalance.
+     * @param haveHist Set of partitions which have WAL history to rebalance.
      * @param exchFut Exchange future for operation.
      * @return Map (nodeId, set of partitions that should be rebalanced 
<b>fully</b> by this node).
      */
-    public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> 
ownersByUpdCounters, Set<Integer> haveHistory,
+    public Map<UUID, Set<Integer>> resetOwners(
+        Map<Integer, Set<UUID>> ownersByUpdCounters,
+        Set<Integer> haveHist,
         GridDhtPartitionsExchangeFuture exchFut);
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 5ee3243..4156878 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -74,6 +74,7 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.ExchangeType.ALL;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
@@ -2268,10 +2269,20 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, 
Set<UUID>> ownersByUpdCounters,
-        Set<Integer> haveHistory,
+    @Override public Map<UUID, Set<Integer>> resetOwners(
+        Map<Integer, Set<UUID>> ownersByUpdCounters,
+        Set<Integer> haveHist,
         GridDhtPartitionsExchangeFuture exchFut) {
-        Map<UUID, Set<Integer>> result = new HashMap<>();
+        Map<UUID, Set<Integer>> res = new HashMap<>();
+
+        List<DiscoveryEvent> evts = exchFut.events().events();
+
+        Set<UUID> joinedNodes = U.newHashSet(evts.size());
+
+        for (DiscoveryEvent evt : evts) {
+            if (evt.type() == EVT_NODE_JOINED)
+                joinedNodes.add(evt.eventNode().id());
+        }
 
         ctx.database().checkpointReadLock();
 
@@ -2282,6 +2293,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             try {
                 // First process local partitions.
+                UUID locNodeId = ctx.localNodeId();
+
                 for (Map.Entry<Integer, Set<UUID>> entry : 
ownersByUpdCounters.entrySet()) {
                     int part = entry.getKey();
                     Set<UUID> newOwners = entry.getValue();
@@ -2291,10 +2304,11 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     if (locPart == null || locPart.state() != OWNING)
                         continue;
 
-                    if (!newOwners.contains(ctx.localNodeId())) {
-                        rebalancePartition(part, !haveHistory.contains(part), 
exchFut);
+                    // Partition state should be mutated only on joining nodes 
if they are exists for the exchange.
+                    if (joinedNodes.isEmpty() && 
!newOwners.contains(locNodeId)) {
+                        rebalancePartition(part, !haveHist.contains(part), 
exchFut);
 
-                        result.computeIfAbsent(ctx.localNodeId(), n -> new 
HashSet<>()).add(part);
+                        res.computeIfAbsent(locNodeId, n -> new 
HashSet<>()).add(part);
                     }
                 }
 
@@ -2305,6 +2319,10 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                     for (Map.Entry<UUID, GridDhtPartitionMap> remotes : 
node2part.entrySet()) {
                         UUID remoteNodeId = remotes.getKey();
+
+                        if (!joinedNodes.isEmpty() && 
!joinedNodes.contains(remoteNodeId))
+                            continue;
+
                         GridDhtPartitionMap partMap = remotes.getValue();
 
                         GridDhtPartitionState state = partMap.get(part);
@@ -2317,15 +2335,15 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                             partMap.updateSequence(partMap.updateSequence() + 
1, partMap.topologyVersion());
 
-                            if (partMap.nodeId().equals(ctx.localNodeId()))
+                            if (partMap.nodeId().equals(locNodeId))
                                 
updateSeq.setIfGreater(partMap.updateSequence());
 
-                            result.computeIfAbsent(remoteNodeId, n -> new 
HashSet<>()).add(part);
+                            res.computeIfAbsent(remoteNodeId, n -> new 
HashSet<>()).add(part);
                         }
                     }
                 }
 
-                for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) {
+                for (Map.Entry<UUID, Set<Integer>> entry : res.entrySet()) {
                     UUID nodeId = entry.getKey();
                     Set<Integer> rebalancedParts = entry.getValue();
 
@@ -2333,7 +2351,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                     if (!rebalancedParts.isEmpty()) {
                         Set<Integer> historical = rebalancedParts.stream()
-                            .filter(haveHistory::contains)
+                            .filter(haveHist::contains)
                             .collect(Collectors.toSet());
 
                         // Filter out partitions having WAL history.
@@ -2371,7 +2389,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             ctx.database().checkpointReadUnlock();
         }
 
-        return result;
+        return res;
     }
 
     /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
new file mode 100644
index 0000000..b8f4b98
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import 
org.apache.ignite.internal.processors.cache.PartitionTxUpdateCounterImpl;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ * Tests the rebalancing not happens on node join for partitions belonging to 
coordinator even if counters are different.
+ */
+public class IgnitePdsSpuriousRebalancingOnNodeJoinTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final int PARTS = 64;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration();
+        dsCfg.setWalSegmentSize(4 * 1024 * 1024);
+        dsCfg.setPageSize(1024);
+
+        dsCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration().
+            setInitialSize(100L * 1024 * 1024).
+            setMaxSize(200L * 1024 * 1024).
+            setPersistenceEnabled(true));
+
+        cfg.setDataStorageConfiguration(dsCfg);
+
+        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
+            setAtomicityMode(TRANSACTIONAL).
+            setCacheMode(REPLICATED).
+            setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @SuppressWarnings("ConstantConditions")
+    @Test
+    public void testNoSpuriousRebalancing() throws Exception {
+        try {
+            IgniteEx crd = startGrids(2);
+
+            crd.cluster().active(true);
+            crd.cluster().baselineAutoAdjustEnabled(false);
+
+            List<Integer> moving = movingKeysAfterJoin(crd, 
DEFAULT_CACHE_NAME, 10);
+
+            int[] primParts = 
crd.affinity(DEFAULT_CACHE_NAME).primaryPartitions(crd.localNode());
+
+            Arrays.sort(primParts);
+
+            int primChangePartId = -1; // This partition will be new primary 
on joining node.
+
+            for (int id : moving) {
+                if (Arrays.binarySearch(primParts, id) >= 0) {
+                    primChangePartId = id;
+
+                    break;
+                }
+            }
+
+            assertTrue(primChangePartId != -1);
+
+            startGrid(2);
+
+            resetBaselineTopology(); // Trigger partition movement.
+
+            awaitPartitionMapExchange();
+
+            GridCacheContext<Object, Object> ctx = 
crd.cachex(DEFAULT_CACHE_NAME).context();
+            AffinityAssignment a0 = ctx.affinity().assignment(new 
AffinityTopologyVersion(3, 1));
+
+            List<ClusterNode> nodes = a0.get(primChangePartId);
+
+            assertEquals(3, nodes.size());
+
+            assertEquals(crd.configuration().getConsistentId(), 
nodes.get(0).consistentId());
+
+            awaitPartitionMapExchange();
+
+            for (int k = 0; k < PARTS * 2; k++)
+                crd.cache(DEFAULT_CACHE_NAME).put(k, k);
+
+            forceCheckpoint();
+
+            stopGrid(2);
+
+            // Forge the counter on coordinator for switching partition.
+            GridDhtLocalPartition part = 
ctx.topology().localPartition(primChangePartId);
+
+            assertNotNull(part);
+
+            PartitionTxUpdateCounterImpl cntr0 = 
(PartitionTxUpdateCounterImpl)part.dataStore().partUpdateCounter();
+
+            AtomicLong cntr = U.field(cntr0, "cntr");
+
+            cntr.set(cntr.get() - 1);
+
+            TestRecordingCommunicationSpi.spi(crd).record((node, msg) -> msg 
instanceof GridDhtPartitionDemandMessage);
+
+            startGrid(2);
+
+            awaitPartitionMapExchange();
+
+            // Expecting no rebalancing.
+            List<Object> msgs = 
TestRecordingCommunicationSpi.spi(crd).recordedMessages(true);
+
+            assertTrue("Rebalancing is not expected " + msgs, msgs.isEmpty());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 2254956..00e77f9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuo
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRemoveDuringRebalancingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRestartAfterFailedToWriteMetaPageTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSpuriousRebalancingOnNodeJoinTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheWalDisabledOnRebalancingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest;
@@ -79,6 +80,7 @@ public class IgnitePdsTestSuite4 {
 
         GridTestUtils.addTestIfNeeded(suite, 
IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
IgnitePdsSpuriousRebalancingOnNodeJoinTest.class, ignoredTests);
 
         // Page lock tracker tests.
         GridTestUtils.addTestIfNeeded(suite, PageLockTrackerManagerTest.class, 
ignoredTests);

Reply via email to