This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1c34e2daca5 IGNITE-22026 Fix incorrect retry logic in
GridCacheIoManager send method (#11311)
1c34e2daca5 is described below
commit 1c34e2daca546f46a9a0c7eb05a7e9d45fa75d59
Author: Ilya Shishkov <[email protected]>
AuthorDate: Fri Apr 19 10:51:16 2024 +0300
IGNITE-22026 Fix incorrect retry logic in GridCacheIoManager send method
(#11311)
---
.../processors/cache/GridCacheIoManager.java | 18 +-
.../cache/GridCacheIoManagerRetryTest.java | 208 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite13.java | 3 +
3 files changed, 218 insertions(+), 11 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b4f291977ff..201bd14f698 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1204,12 +1204,13 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
int cnt = 0;
- while (cnt <= retryCnt) {
+ while (true) {
try {
- cnt++;
-
cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
+ if (log.isDebugEnabled())
+ log.debug("Sent cache message [msg=" + msg + ", node=" +
U.toShortString(node) + ']');
+
return;
}
catch (ClusterTopologyCheckedException e) {
@@ -1219,7 +1220,7 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
if (!cctx.discovery().alive(node.id()) ||
!cctx.discovery().pingNode(node.id()))
throw new ClusterTopologyCheckedException("Node left grid
while sending message to: " + node.id(), e);
- if (cnt == retryCnt || cctx.kernalContext().isStopping())
+ if (cnt++ >= retryCnt || cctx.kernalContext().isStopping())
throw e;
else if (log.isDebugEnabled())
log.debug("Failed to send message to node (will retry): "
+ node.id());
@@ -1227,9 +1228,6 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
U.sleep(retryDelay);
}
-
- if (log.isDebugEnabled())
- log.debug("Sent cache message [msg=" + msg + ", node=" +
U.toShortString(node) + ']');
}
/**
@@ -1267,10 +1265,8 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
int cnt = 0;
- while (cnt <= retryCnt) {
+ while (true) {
try {
- cnt++;
-
cctx.gridIO().sendOrderedMessage(node, topic, msg, plc,
timeout, false);
if (log.isDebugEnabled())
@@ -1286,7 +1282,7 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
if (cctx.discovery().node(node.id()) == null)
throw new ClusterTopologyCheckedException("Node left grid
while sending ordered message to: " + node.id(), e);
- if (cnt == retryCnt)
+ if (cnt++ >= retryCnt)
throw e;
else if (log.isDebugEnabled())
log.debug("Failed to send message to node (will retry): "
+ node.id());
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIoManagerRetryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIoManagerRetryTest.java
new file mode 100644
index 00000000000..0ab5cd53822
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIoManagerRetryTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
+import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.deployment.local.LocalDeploymentSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestNode;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import static
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SEND_RETRY_CNT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/** */
+@RunWith(Parameterized.class)
+public class GridCacheIoManagerRetryTest extends GridCommonAbstractTest {
+ /** Remote node. */
+ private static final ClusterNode REMOTE_NODE = new
GridTestNode(UUID.randomUUID());
+
+ /** Local node. */
+ private static final ClusterNode LOCAL_NODE = new
GridTestNode(UUID.randomUUID());
+
+ /** Retry count. */
+ @Parameter
+ public int retryCnt;
+
+ /** */
+ @Parameters(name = "retryCnt={0}")
+ public static Iterable<Integer> parameters() {
+ return Arrays.asList(0, 1, DFLT_SEND_RETRY_CNT, 10);
+ }
+
+ /** */
+ @Test
+ public void testSend() throws Exception {
+ // Only cluster node argument is useful for test.
+ doTest(ctx -> () -> gridCacheIoManager(ctx)
+ .send(REMOTE_NODE, new GridNearTxFinishResponse(),
SYSTEM_POOL));
+ }
+
+ /** */
+ @Test
+ public void testSendOrdered() throws Exception {
+ // Only cluster node argument is useful for test.
+ doTest(ctx -> () -> gridCacheIoManager(ctx)
+ .sendOrderedMessage(
+ REMOTE_NODE,
+ TOPIC_CACHE,
+ new GridNearTxFinishResponse(),
+ SYSTEM_POOL,
+ Long.MAX_VALUE
+ ));
+ }
+
+ /** */
+ private void doTest(Function<GridKernalContext, RunnableX> action) throws
Exception {
+ AtomicInteger sendCnt = new AtomicInteger();
+
+ GridKernalContext ctx = kernalContext(retryCnt, sendCnt);
+
+ Throwable actionRes = assertThrows(log(), action.apply(ctx),
IgniteException.class, "Test cause");
+
+ // RunnableX rethrows IgniteCheckedException as IgniteException.
+ assertTrue(X.hasCause(actionRes, IgniteCheckedException.class));
+
+ assertEquals("Unexpected send count", retryCnt + 1 /* first send +
retries */,
+ sendCnt.get());
+ }
+
+ /**
+ * Initialize GridCacheIoManager and GridCacheSharedContext.
+ *
+ * @param ctx Kernal context.
+ */
+ @SuppressWarnings("unchecked")
+ @NotNull private static GridCacheIoManager
gridCacheIoManager(GridKernalContext ctx) throws IgniteCheckedException {
+ GridCacheIoManager cacheIoMgr = new GridCacheIoManager();
+
+ GridCacheSharedContext<?, ?> cctx = GridCacheSharedContext.builder()
+ .setIoManager(cacheIoMgr)
+ .setPartitionExchangeManager(new
GridCachePartitionExchangeManager<>())
+ .build(ctx, null);
+
+ cacheIoMgr.start(cctx);
+
+ return cacheIoMgr;
+ }
+
+ /**
+ * Configure and initalize GridKernalContext.
+ *
+ * @param retryCnt Reconnect count.
+ * @param sendCnt Send attempts counter.
+ */
+ private GridKernalContext kernalContext(int retryCnt, AtomicInteger
sendCnt) throws IgniteCheckedException {
+ GridTestKernalContext ctx = newContext();
+
+ // Necessary to init GridKernalContext.
+ ctx.config().setPeerClassLoadingEnabled(true);
+ ctx.config().setDeploymentSpi(new LocalDeploymentSpi());
+ ctx.config().setCommunicationSpi(new TcpCommunicationSpi());
+
+ ctx.config().setNetworkSendRetryCount(retryCnt);
+ ctx.config().setNetworkSendRetryDelay(1);
+
+ // Discovery returns remote and local nodes and successfully pings
remote node.
+ ctx.config().setDiscoverySpi(new TcpDiscoverySpi() {
+ @Override public @Nullable ClusterNode getNode(UUID nodeId) {
+ return nodeId.equals(REMOTE_NODE.id()) ? REMOTE_NODE :
+ nodeId.equals(LOCAL_NODE.id()) ? LOCAL_NODE : null;
+ }
+
+ @Override public boolean pingNode(UUID nodeId) {
+ return nodeId.equals(REMOTE_NODE.id());
+ }
+
+ @Override public ClusterNode getLocalNode() {
+ return LOCAL_NODE;
+ }
+ });
+
+ // Necessary to init GridKernalContext.
+ ctx.add(new PoolProcessor(ctx));
+ ctx.add(new GridSystemViewManager(ctx));
+ ctx.add(new IgnitePluginProcessor(ctx, ctx.config(),
Collections.emptyList()));
+ ctx.add(new GridDeploymentManager(ctx));
+
+ ctx.add(new GridDiscoveryManager(ctx) {
+ @Override public @Nullable ClusterNode node(UUID nodeId) {
+ return nodeId.equals(REMOTE_NODE.id()) ? REMOTE_NODE :
+ nodeId.equals(LOCAL_NODE.id()) ? LOCAL_NODE : null;
+ }
+ });
+
+ configureGridIoManager(ctx, sendCnt);
+
+ return ctx;
+ }
+
+ /**
+ * Configure GridIoManager, which throws exception and counts sending
attempts.
+ *
+ * @param ctx Context.
+ * @param sendCnt Send attempts counter.
+ */
+ private void configureGridIoManager(GridTestKernalContext ctx,
AtomicInteger sendCnt) {
+ ctx.add(new GridIoManager(ctx) {
+ @Override public void sendToGridTopic(ClusterNode node, GridTopic
topic, Message msg, byte plc)
+ throws IgniteCheckedException {
+ sendCnt.incrementAndGet();
+
+ throw new IgniteCheckedException("Test cause");
+ }
+
+ @Override public void sendOrderedMessage(ClusterNode node, Object
topic, Message msg, byte plc,
+ long timeout, boolean skipOnTimeout) throws
IgniteCheckedException {
+ sendCnt.incrementAndGet();
+
+ throw new IgniteCheckedException("Test cause");
+ }
+ });
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
index 9682d66c851..6c9daabd47d 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.internal.processors.cache.CacheClearAsyncDeadlockTest;
import
org.apache.ignite.internal.processors.cache.CacheDistributedGetLongRunningFutureDumpTest;
import org.apache.ignite.internal.processors.cache.EntriesRemoveOnShutdownTest;
import
org.apache.ignite.internal.processors.cache.GridCacheDataTypesCoverageTest;
+import org.apache.ignite.internal.processors.cache.GridCacheIoManagerRetryTest;
import
org.apache.ignite.internal.processors.cache.GridCacheLongRunningTransactionDiagnosticsTest;
import
org.apache.ignite.internal.processors.cache.GridCacheVersionGenerationWithCacheStorageTest;
import
org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomicOperationTest;
@@ -118,6 +119,8 @@ public class IgniteCacheTestSuite13 {
GridTestUtils.addTestIfNeeded(suite,
CacheDistributedGetLongRunningFutureDumpTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
EntriesRemoveOnShutdownTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
GridCacheIoManagerRetryTest.class, ignoredTests);
+
return suite;
}
}