This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 1c34e2daca5 IGNITE-22026 Fix incorrect retry logic in GridCacheIoManager send method (#11311) 1c34e2daca5 is described below commit 1c34e2daca546f46a9a0c7eb05a7e9d45fa75d59 Author: Ilya Shishkov <shishkovi...@gmail.com> AuthorDate: Fri Apr 19 10:51:16 2024 +0300 IGNITE-22026 Fix incorrect retry logic in GridCacheIoManager send method (#11311) --- .../processors/cache/GridCacheIoManager.java | 18 +- .../cache/GridCacheIoManagerRetryTest.java | 208 +++++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite13.java | 3 + 3 files changed, 218 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index b4f291977ff..201bd14f698 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -1204,12 +1204,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { int cnt = 0; - while (cnt <= retryCnt) { + while (true) { try { - cnt++; - cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc); + if (log.isDebugEnabled()) + log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']'); + return; } catch (ClusterTopologyCheckedException e) { @@ -1219,7 +1220,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (!cctx.discovery().alive(node.id()) || !cctx.discovery().pingNode(node.id())) throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e); - if (cnt == retryCnt || cctx.kernalContext().isStopping()) + if (cnt++ >= retryCnt || cctx.kernalContext().isStopping()) throw e; else if (log.isDebugEnabled()) log.debug("Failed to send message to node (will retry): " + node.id()); @@ -1227,9 +1228,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { U.sleep(retryDelay); } - - if (log.isDebugEnabled()) - log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']'); } /** @@ -1267,10 +1265,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { int cnt = 0; - while (cnt <= retryCnt) { + while (true) { try { - cnt++; - cctx.gridIO().sendOrderedMessage(node, topic, msg, plc, timeout, false); if (log.isDebugEnabled()) @@ -1286,7 +1282,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (cctx.discovery().node(node.id()) == null) throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + node.id(), e); - if (cnt == retryCnt) + if (cnt++ >= retryCnt) throw e; else if (log.isDebugEnabled()) log.debug("Failed to send message to node (will retry): " + node.id()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIoManagerRetryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIoManagerRetryTest.java new file mode 100644 index 00000000000..0ab5cd53822 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIoManagerRetryTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Arrays; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.systemview.GridSystemViewManager; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; +import org.apache.ignite.internal.processors.pool.PoolProcessor; +import org.apache.ignite.internal.util.lang.RunnableX; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.deployment.local.LocalDeploymentSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestNode; +import org.apache.ignite.testframework.junits.GridTestKernalContext; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SEND_RETRY_CNT; +import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; + +/** */ +@RunWith(Parameterized.class) +public class GridCacheIoManagerRetryTest extends GridCommonAbstractTest { + /** Remote node. */ + private static final ClusterNode REMOTE_NODE = new GridTestNode(UUID.randomUUID()); + + /** Local node. */ + private static final ClusterNode LOCAL_NODE = new GridTestNode(UUID.randomUUID()); + + /** Retry count. */ + @Parameter + public int retryCnt; + + /** */ + @Parameters(name = "retryCnt={0}") + public static Iterable<Integer> parameters() { + return Arrays.asList(0, 1, DFLT_SEND_RETRY_CNT, 10); + } + + /** */ + @Test + public void testSend() throws Exception { + // Only cluster node argument is useful for test. + doTest(ctx -> () -> gridCacheIoManager(ctx) + .send(REMOTE_NODE, new GridNearTxFinishResponse(), SYSTEM_POOL)); + } + + /** */ + @Test + public void testSendOrdered() throws Exception { + // Only cluster node argument is useful for test. + doTest(ctx -> () -> gridCacheIoManager(ctx) + .sendOrderedMessage( + REMOTE_NODE, + TOPIC_CACHE, + new GridNearTxFinishResponse(), + SYSTEM_POOL, + Long.MAX_VALUE + )); + } + + /** */ + private void doTest(Function<GridKernalContext, RunnableX> action) throws Exception { + AtomicInteger sendCnt = new AtomicInteger(); + + GridKernalContext ctx = kernalContext(retryCnt, sendCnt); + + Throwable actionRes = assertThrows(log(), action.apply(ctx), IgniteException.class, "Test cause"); + + // RunnableX rethrows IgniteCheckedException as IgniteException. + assertTrue(X.hasCause(actionRes, IgniteCheckedException.class)); + + assertEquals("Unexpected send count", retryCnt + 1 /* first send + retries */, + sendCnt.get()); + } + + /** + * Initialize GridCacheIoManager and GridCacheSharedContext. + * + * @param ctx Kernal context. + */ + @SuppressWarnings("unchecked") + @NotNull private static GridCacheIoManager gridCacheIoManager(GridKernalContext ctx) throws IgniteCheckedException { + GridCacheIoManager cacheIoMgr = new GridCacheIoManager(); + + GridCacheSharedContext<?, ?> cctx = GridCacheSharedContext.builder() + .setIoManager(cacheIoMgr) + .setPartitionExchangeManager(new GridCachePartitionExchangeManager<>()) + .build(ctx, null); + + cacheIoMgr.start(cctx); + + return cacheIoMgr; + } + + /** + * Configure and initalize GridKernalContext. + * + * @param retryCnt Reconnect count. + * @param sendCnt Send attempts counter. + */ + private GridKernalContext kernalContext(int retryCnt, AtomicInteger sendCnt) throws IgniteCheckedException { + GridTestKernalContext ctx = newContext(); + + // Necessary to init GridKernalContext. + ctx.config().setPeerClassLoadingEnabled(true); + ctx.config().setDeploymentSpi(new LocalDeploymentSpi()); + ctx.config().setCommunicationSpi(new TcpCommunicationSpi()); + + ctx.config().setNetworkSendRetryCount(retryCnt); + ctx.config().setNetworkSendRetryDelay(1); + + // Discovery returns remote and local nodes and successfully pings remote node. + ctx.config().setDiscoverySpi(new TcpDiscoverySpi() { + @Override public @Nullable ClusterNode getNode(UUID nodeId) { + return nodeId.equals(REMOTE_NODE.id()) ? REMOTE_NODE : + nodeId.equals(LOCAL_NODE.id()) ? LOCAL_NODE : null; + } + + @Override public boolean pingNode(UUID nodeId) { + return nodeId.equals(REMOTE_NODE.id()); + } + + @Override public ClusterNode getLocalNode() { + return LOCAL_NODE; + } + }); + + // Necessary to init GridKernalContext. + ctx.add(new PoolProcessor(ctx)); + ctx.add(new GridSystemViewManager(ctx)); + ctx.add(new IgnitePluginProcessor(ctx, ctx.config(), Collections.emptyList())); + ctx.add(new GridDeploymentManager(ctx)); + + ctx.add(new GridDiscoveryManager(ctx) { + @Override public @Nullable ClusterNode node(UUID nodeId) { + return nodeId.equals(REMOTE_NODE.id()) ? REMOTE_NODE : + nodeId.equals(LOCAL_NODE.id()) ? LOCAL_NODE : null; + } + }); + + configureGridIoManager(ctx, sendCnt); + + return ctx; + } + + /** + * Configure GridIoManager, which throws exception and counts sending attempts. + * + * @param ctx Context. + * @param sendCnt Send attempts counter. + */ + private void configureGridIoManager(GridTestKernalContext ctx, AtomicInteger sendCnt) { + ctx.add(new GridIoManager(ctx) { + @Override public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc) + throws IgniteCheckedException { + sendCnt.incrementAndGet(); + + throw new IgniteCheckedException("Test cause"); + } + + @Override public void sendOrderedMessage(ClusterNode node, Object topic, Message msg, byte plc, + long timeout, boolean skipOnTimeout) throws IgniteCheckedException { + sendCnt.incrementAndGet(); + + throw new IgniteCheckedException("Test cause"); + } + }); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java index 9682d66c851..6c9daabd47d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.CacheClearAsyncDeadlockTest; import org.apache.ignite.internal.processors.cache.CacheDistributedGetLongRunningFutureDumpTest; import org.apache.ignite.internal.processors.cache.EntriesRemoveOnShutdownTest; import org.apache.ignite.internal.processors.cache.GridCacheDataTypesCoverageTest; +import org.apache.ignite.internal.processors.cache.GridCacheIoManagerRetryTest; import org.apache.ignite.internal.processors.cache.GridCacheLongRunningTransactionDiagnosticsTest; import org.apache.ignite.internal.processors.cache.GridCacheVersionGenerationWithCacheStorageTest; import org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomicOperationTest; @@ -118,6 +119,8 @@ public class IgniteCacheTestSuite13 { GridTestUtils.addTestIfNeeded(suite, CacheDistributedGetLongRunningFutureDumpTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, EntriesRemoveOnShutdownTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, GridCacheIoManagerRetryTest.class, ignoredTests); + return suite; } }