This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new cc6f4d7 IGNITE-12671 Ignoring single messages during PME can prevent
late affinity switch. - Fixes #7425.
cc6f4d7 is described below
commit cc6f4d7814493aaeb485b39056d6afd44b98919d
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Feb 14 10:42:38 2020 +0300
IGNITE-12671 Ignoring single messages during PME can prevent late affinity
switch. - Fixes #7425.
Signed-off-by: Aleksei Scherbakov <[email protected]>
---
.../cache/GridCachePartitionExchangeManager.java | 79 ++++------
.../cache/RebalanceCompleteDuringExchangeTest.java | 161 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite4.java | 2 +
3 files changed, 192 insertions(+), 50 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e78ada3..cc65919 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -80,7 +80,6 @@ import
org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
@@ -405,32 +404,8 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
return;
}
}
- else if (exchangeInProgress()) {
- if (log.isInfoEnabled())
- log.info("Ignore single message without exchange
id (there is exchange in progress) [nodeId=" + node.id() + "]");
-
- return;
- }
-
- if (!crdInitFut.isDone() && !msg.restoreState()) {
- GridDhtPartitionExchangeId exchId = msg.exchangeId();
-
- if (log.isInfoEnabled()) {
- log.info("Waiting for coordinator initialization
[node=" + node.id() +
- ", nodeOrder=" + node.order() +
- ", ver=" + (exchId != null ?
exchId.topologyVersion() : null) + ']');
- }
-
- crdInitFut.listen(new CI1<IgniteInternalFuture>() {
- @Override public void apply(IgniteInternalFuture
fut) {
- processSinglePartitionUpdate(node, msg);
- }
- });
- return;
- }
-
- processSinglePartitionUpdate(node, msg);
+ preprocessSingleMessage(node, msg);
}
});
@@ -517,6 +492,34 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
}
/**
+ * Preprocess {@code msg} which was sended by {@code node}.
+ *
+ * @param node Cluster node.
+ * @param msg Message.
+ */
+ private void preprocessSingleMessage(ClusterNode node,
GridDhtPartitionsSingleMessage msg) {
+ if (!crdInitFut.isDone() && !msg.restoreState()) {
+ GridDhtPartitionExchangeId exchId = msg.exchangeId();
+
+ if (log.isInfoEnabled()) {
+ log.info("Waiting for coordinator initialization [node=" +
node.id() +
+ ", nodeOrder=" + node.order() +
+ ", ver=" + (exchId != null ? exchId.topologyVersion() :
null) + ']');
+ }
+
+ crdInitFut.listen(new CI1<IgniteInternalFuture>() {
+ @Override public void apply(IgniteInternalFuture fut) {
+ processSinglePartitionUpdate(node, msg);
+ }
+ });
+
+ return;
+ }
+
+ processSinglePartitionUpdate(node, msg);
+ }
+
+ /**
*
*/
public void onCoordinatorInitialized() {
@@ -2795,30 +2798,6 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
return exchWorker != null && Thread.currentThread() ==
exchWorker.runner();
}
- /**
- * @return {@code True} If there is any exchange future in progress.
- */
- private boolean exchangeInProgress() {
- if (exchWorker.hasPendingServerExchange())
- return true;
-
- GridDhtPartitionsExchangeFuture current = lastTopologyFuture();
-
- if (current == null)
- return false;
-
- GridDhtTopologyFuture finished = lastFinishedFut.get();
-
- if (finished == null ||
finished.result().compareTo(current.initialVersion()) < 0) {
- ClusterNode triggeredBy = current.firstEvent().eventNode();
-
- if (current.partitionChangesInProgress() &&
!triggeredBy.isClient())
- return true;
- }
-
- return false;
- }
-
/** */
public boolean affinityChanged(AffinityTopologyVersion from,
AffinityTopologyVersion to) {
if (lastAffinityChangedTopologyVersion(to).compareTo(from) >= 0)
diff --git
a/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCompleteDuringExchangeTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCompleteDuringExchangeTest.java
new file mode 100644
index 0000000..242a8e7
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCompleteDuringExchangeTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * This test hangs exchange and waits rebalance complete. After partitions
rebalance completed exchange will unlock and
+ * test will wait ideal assignment.
+ */
+public class RebalanceCompleteDuringExchangeTest extends
GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name)
throws Exception {
+ return super.getConfiguration(name)
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setCacheMode(CacheMode.REPLICATED))
+ .setCommunicationSpi(new TestRecordingCommunicationSpi());
+ }
+
+ /**
+ * Waits ideal assignment for configured cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalance() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+
+ ignite0.cluster().active(true);
+
+ IgniteCache cache = ignite0.cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 2000; i++)
+ cache.put(i, i);
+
+ IgniteEx ignite1 = startNodeAndBlockRebalance(1);
+
+ TestRecordingCommunicationSpi commSpi2 = startNodeAndBlockExchange(2);
+
+ TestRecordingCommunicationSpi commSpi1 =
TestRecordingCommunicationSpi.spi(ignite1);
+
+ commSpi1.waitForRecorded();
+
+ info(getTestIgniteInstanceName(1) + " sent Single message to
coordinator.");
+
+ commSpi1.recordedMessages(true);
+
+ commSpi1.record(GridDhtPartitionsSingleMessage.class);
+
+ commSpi2.waitForBlocked();
+
+ info("Exchange is waiting Single message from " +
getTestIgniteInstanceName(2));
+
+ commSpi1.waitForBlocked();
+
+ info("Rebalance on " + getTestIgniteInstanceName(1) + " was blocked.");
+
+ commSpi1.stopBlock();
+
+ //Test is waiting sent information about rebalance complete.
+ commSpi1.waitForRecorded();
+
+ info("Rebalance on " + getTestIgniteInstanceName(1) + " was unblocked
and completed.");
+
+ commSpi2.stopBlock();
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * Starts node and blocks exchange on it.
+ *
+ * @param nodeNum Number of node.
+ * @return Test communication spi.
+ * @throws Exception If failed.
+ */
+ public TestRecordingCommunicationSpi startNodeAndBlockExchange(int
nodeNum) throws Exception {
+ IgniteConfiguration cfg =
optimize(getConfiguration(getTestIgniteInstanceName(nodeNum)));
+
+ TestRecordingCommunicationSpi commSpi =
(TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ commSpi.blockMessages(GridDhtPartitionsSingleMessage.class,
getTestIgniteInstanceName(0));
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+ try {
+ IgniteEx ignite2 = startGrid(cfg);
+ }
+ catch (Exception e) {
+ log.error("Start clustr exception " + e.getMessage(), e);
+ }
+ });
+
+ return commSpi;
+ }
+
+ /**
+ * Starts node and blocks rebalance.
+ *
+ * @param nodeNum Number of node.
+ * @throws Exception If failed.
+ */
+ public IgniteEx startNodeAndBlockRebalance(int nodeNum) throws Exception {
+ IgniteConfiguration cfg =
optimize(getConfiguration(getTestIgniteInstanceName(nodeNum)));
+
+ TestRecordingCommunicationSpi commSpi =
(TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ commSpi.record((ClusterNode node, Message msg) -> {
+ if (msg instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage singleMessage =
(GridDhtPartitionsSingleMessage)msg;
+
+ if (singleMessage.exchangeId() == null)
+ return false;
+
+ return singleMessage.exchangeId().topologyVersion().equals(new
AffinityTopologyVersion(3, 0));
+ }
+
+ return false;
+ });
+
+ commSpi.blockMessages((ClusterNode node, Message msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage demandMessage =
(GridDhtPartitionDemandMessage)msg;
+
+ return CU.cacheId(DEFAULT_CACHE_NAME) ==
demandMessage.groupId();
+ }
+
+ return false;
+ });
+
+ return startGrid(cfg);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 00e77f9..77492a0 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.cache.RebalanceCompleteDuringExchangeTest;
import org.apache.ignite.cache.ResetLostPartitionTest;
import
org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
import
org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
@@ -81,6 +82,7 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite,
IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgnitePdsSpuriousRebalancingOnNodeJoinTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
RebalanceCompleteDuringExchangeTest.class, ignoredTests);
// Page lock tracker tests.
GridTestUtils.addTestIfNeeded(suite, PageLockTrackerManagerTest.class,
ignoredTests);