This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 223281b IGNITE-12546 Prevent partitions owned by other nodes switch
their state to MOVING due to counter difference on node join. - Fixes #7273.
223281b is described below
commit 223281be93f61705530d8e88ac7eee3fe39453a9
Author: Aleksei Scherbakov <[email protected]>
AuthorDate: Mon Jan 20 14:41:06 2020 +0300
IGNITE-12546 Prevent partitions owned by other nodes switch their state to
MOVING due to counter difference on node join. - Fixes #7273.
Signed-off-by: Aleksei Scherbakov <[email protected]>
---
.../dht/topology/GridClientPartitionTopology.java | 20 +--
.../dht/topology/GridDhtPartitionTopology.java | 8 +-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 40 +++--
...IgnitePdsSpuriousRebalancingOnNodeJoinTest.java | 170 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite4.java | 2 +
5 files changed, 217 insertions(+), 23 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index c9c090b..7a988a5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1163,10 +1163,12 @@ public class GridClientPartitionTopology implements
GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer,
Set<UUID>> ownersByUpdCounters,
- Set<Integer> haveHistory,
- GridDhtPartitionsExchangeFuture exchFut) {
- Map<UUID, Set<Integer>> result = new HashMap<>();
+ @Override public Map<UUID, Set<Integer>> resetOwners(
+ Map<Integer, Set<UUID>> ownersByUpdCounters,
+ Set<Integer> haveHist,
+ GridDhtPartitionsExchangeFuture exchFut
+ ) {
+ Map<UUID, Set<Integer>> res = new HashMap<>();
lock.writeLock().lock();
@@ -1190,19 +1192,19 @@ public class GridClientPartitionTopology implements
GridDhtPartitionTopology {
partMap.updateSequence(partMap.updateSequence() + 1,
partMap.topologyVersion());
- result.computeIfAbsent(remoteNodeId, n -> new
HashSet<>());
- result.get(remoteNodeId).add(part);
+ res.computeIfAbsent(remoteNodeId, n -> new
HashSet<>());
+ res.get(remoteNodeId).add(part);
}
}
}
- for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) {
+ for (Map.Entry<UUID, Set<Integer>> entry : res.entrySet()) {
UUID nodeId = entry.getKey();
Set<Integer> partsToRebalance = entry.getValue();
if (!partsToRebalance.isEmpty()) {
Set<Integer> historical = partsToRebalance.stream()
- .filter(haveHistory::contains)
+ .filter(haveHist::contains)
.collect(Collectors.toSet());
// Filter out partitions having WAL history.
@@ -1224,7 +1226,7 @@ public class GridClientPartitionTopology implements
GridDhtPartitionTopology {
lock.writeLock().unlock();
}
- return result;
+ return res;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 2a66224..1097294 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -424,17 +424,19 @@ public interface GridDhtPartitionTopology {
public boolean rebalanceFinished(AffinityTopologyVersion topVer);
/**
- * Calculates nodes and partitions which have non-actual state and must be
rebalanced.
+ * Calculates nodes and partitions which have non-actual state (based on
LWM value) and must be rebalanced.
* State of all current owners that aren't contained in the given {@code
ownersByUpdCounters} will be reset to MOVING.
* Called on coordinator during assignment of partition states.
*
* @param ownersByUpdCounters Map (partition, set of node IDs that have
most actual state about partition
* (update counter is maximal) and should hold
OWNING state for such partition).
- * @param haveHistory Set of partitions which have WAL history to
rebalance.
+ * @param haveHist Set of partitions which have WAL history to rebalance.
* @param exchFut Exchange future for operation.
* @return Map (nodeId, set of partitions that should be rebalanced
<b>fully</b> by this node).
*/
- public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>>
ownersByUpdCounters, Set<Integer> haveHistory,
+ public Map<UUID, Set<Integer>> resetOwners(
+ Map<Integer, Set<UUID>> ownersByUpdCounters,
+ Set<Integer> haveHist,
GridDhtPartitionsExchangeFuture exchFut);
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 5ee3243..4156878 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -74,6 +74,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.ExchangeType.ALL;
import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
@@ -2268,10 +2269,20 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer,
Set<UUID>> ownersByUpdCounters,
- Set<Integer> haveHistory,
+ @Override public Map<UUID, Set<Integer>> resetOwners(
+ Map<Integer, Set<UUID>> ownersByUpdCounters,
+ Set<Integer> haveHist,
GridDhtPartitionsExchangeFuture exchFut) {
- Map<UUID, Set<Integer>> result = new HashMap<>();
+ Map<UUID, Set<Integer>> res = new HashMap<>();
+
+ List<DiscoveryEvent> evts = exchFut.events().events();
+
+ Set<UUID> joinedNodes = U.newHashSet(evts.size());
+
+ for (DiscoveryEvent evt : evts) {
+ if (evt.type() == EVT_NODE_JOINED)
+ joinedNodes.add(evt.eventNode().id());
+ }
ctx.database().checkpointReadLock();
@@ -2282,6 +2293,8 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
try {
// First process local partitions.
+ UUID locNodeId = ctx.localNodeId();
+
for (Map.Entry<Integer, Set<UUID>> entry :
ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
Set<UUID> newOwners = entry.getValue();
@@ -2291,10 +2304,11 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
if (locPart == null || locPart.state() != OWNING)
continue;
- if (!newOwners.contains(ctx.localNodeId())) {
- rebalancePartition(part, !haveHistory.contains(part),
exchFut);
+ // Partition state should be mutated only on joining nodes
if they are exists for the exchange.
+ if (joinedNodes.isEmpty() &&
!newOwners.contains(locNodeId)) {
+ rebalancePartition(part, !haveHist.contains(part),
exchFut);
- result.computeIfAbsent(ctx.localNodeId(), n -> new
HashSet<>()).add(part);
+ res.computeIfAbsent(locNodeId, n -> new
HashSet<>()).add(part);
}
}
@@ -2305,6 +2319,10 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
for (Map.Entry<UUID, GridDhtPartitionMap> remotes :
node2part.entrySet()) {
UUID remoteNodeId = remotes.getKey();
+
+ if (!joinedNodes.isEmpty() &&
!joinedNodes.contains(remoteNodeId))
+ continue;
+
GridDhtPartitionMap partMap = remotes.getValue();
GridDhtPartitionState state = partMap.get(part);
@@ -2317,15 +2335,15 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
partMap.updateSequence(partMap.updateSequence() +
1, partMap.topologyVersion());
- if (partMap.nodeId().equals(ctx.localNodeId()))
+ if (partMap.nodeId().equals(locNodeId))
updateSeq.setIfGreater(partMap.updateSequence());
- result.computeIfAbsent(remoteNodeId, n -> new
HashSet<>()).add(part);
+ res.computeIfAbsent(remoteNodeId, n -> new
HashSet<>()).add(part);
}
}
}
- for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) {
+ for (Map.Entry<UUID, Set<Integer>> entry : res.entrySet()) {
UUID nodeId = entry.getKey();
Set<Integer> rebalancedParts = entry.getValue();
@@ -2333,7 +2351,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
if (!rebalancedParts.isEmpty()) {
Set<Integer> historical = rebalancedParts.stream()
- .filter(haveHistory::contains)
+ .filter(haveHist::contains)
.collect(Collectors.toSet());
// Filter out partitions having WAL history.
@@ -2371,7 +2389,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
ctx.database().checkpointReadUnlock();
}
- return result;
+ return res;
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
new file mode 100644
index 0000000..b8f4b98
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import
org.apache.ignite.internal.processors.cache.PartitionTxUpdateCounterImpl;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ * Tests the rebalancing not happens on node join for partitions belonging to
coordinator even if counters are different.
+ */
+public class IgnitePdsSpuriousRebalancingOnNodeJoinTest extends
GridCommonAbstractTest {
+ /** */
+ private static final int PARTS = 64;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration();
+ dsCfg.setWalSegmentSize(4 * 1024 * 1024);
+ dsCfg.setPageSize(1024);
+
+ dsCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration().
+ setInitialSize(100L * 1024 * 1024).
+ setMaxSize(200L * 1024 * 1024).
+ setPersistenceEnabled(true));
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
+ setAtomicityMode(TRANSACTIONAL).
+ setCacheMode(REPLICATED).
+ setAffinity(new RendezvousAffinityFunction(false, PARTS)));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @SuppressWarnings("ConstantConditions")
+ @Test
+ public void testNoSpuriousRebalancing() throws Exception {
+ try {
+ IgniteEx crd = startGrids(2);
+
+ crd.cluster().active(true);
+ crd.cluster().baselineAutoAdjustEnabled(false);
+
+ List<Integer> moving = movingKeysAfterJoin(crd,
DEFAULT_CACHE_NAME, 10);
+
+ int[] primParts =
crd.affinity(DEFAULT_CACHE_NAME).primaryPartitions(crd.localNode());
+
+ Arrays.sort(primParts);
+
+ int primChangePartId = -1; // This partition will be new primary
on joining node.
+
+ for (int id : moving) {
+ if (Arrays.binarySearch(primParts, id) >= 0) {
+ primChangePartId = id;
+
+ break;
+ }
+ }
+
+ assertTrue(primChangePartId != -1);
+
+ startGrid(2);
+
+ resetBaselineTopology(); // Trigger partition movement.
+
+ awaitPartitionMapExchange();
+
+ GridCacheContext<Object, Object> ctx =
crd.cachex(DEFAULT_CACHE_NAME).context();
+ AffinityAssignment a0 = ctx.affinity().assignment(new
AffinityTopologyVersion(3, 1));
+
+ List<ClusterNode> nodes = a0.get(primChangePartId);
+
+ assertEquals(3, nodes.size());
+
+ assertEquals(crd.configuration().getConsistentId(),
nodes.get(0).consistentId());
+
+ awaitPartitionMapExchange();
+
+ for (int k = 0; k < PARTS * 2; k++)
+ crd.cache(DEFAULT_CACHE_NAME).put(k, k);
+
+ forceCheckpoint();
+
+ stopGrid(2);
+
+ // Forge the counter on coordinator for switching partition.
+ GridDhtLocalPartition part =
ctx.topology().localPartition(primChangePartId);
+
+ assertNotNull(part);
+
+ PartitionTxUpdateCounterImpl cntr0 =
(PartitionTxUpdateCounterImpl)part.dataStore().partUpdateCounter();
+
+ AtomicLong cntr = U.field(cntr0, "cntr");
+
+ cntr.set(cntr.get() - 1);
+
+ TestRecordingCommunicationSpi.spi(crd).record((node, msg) -> msg
instanceof GridDhtPartitionDemandMessage);
+
+ startGrid(2);
+
+ awaitPartitionMapExchange();
+
+ // Expecting no rebalancing.
+ List<Object> msgs =
TestRecordingCommunicationSpi.spi(crd).recordedMessages(true);
+
+ assertTrue("Rebalancing is not expected " + msgs, msgs.isEmpty());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 2254956..00e77f9 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -29,6 +29,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuo
import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRemoveDuringRebalancingTest;
import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRestartAfterFailedToWriteMetaPageTest;
+import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSpuriousRebalancingOnNodeJoinTest;
import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheWalDisabledOnRebalancingTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest;
@@ -79,6 +80,7 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite,
IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
IgnitePdsSpuriousRebalancingOnNodeJoinTest.class, ignoredTests);
// Page lock tracker tests.
GridTestUtils.addTestIfNeeded(suite, PageLockTrackerManagerTest.class,
ignoredTests);