This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch gg-19225 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 5dcff9f6d73483e28aee4da53e293ac5518d713f Author: Sergey Chugunov <[email protected]> AuthorDate: Tue Jun 4 18:19:53 2019 +0300 GG-18656 attempt to restore connection to failed client may lead to firing SYSTEM_WORKER_BLOCKED failure in exchange-worker (cherry-picked from commit #c15ef97) --- .../cache/GridCachePartitionExchangeManager.java | 7 ++- .../spi/communication/tcp/TcpCommunicationSpi.java | 30 +++++++++- .../ignite/internal/IgniteClientFailuresTest.java | 69 ++++++++++++++++++---- 3 files changed, 91 insertions(+), 15 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 4f5d2fc..46dade4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -178,6 +178,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** Stripe id for cluster activation event. */ private static final int CLUSTER_ACTIVATION_EVT_STRIPE_ID = Integer.MAX_VALUE; + /** */ + private static final String EXCHANGE_WORKER_THREAD_NAME = "exchange-worker"; + /** Atomic reference for pending partition resend timeout object. */ private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>(); @@ -719,7 +722,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana else if (reconnect) reconnectExchangeFut.onDone(); - new IgniteThread(cctx.igniteInstanceName(), "exchange-worker", exchWorker).start(); + new IgniteThread(cctx.igniteInstanceName(), exchWorker.name(), exchWorker).start(); if (reconnect) { if (fut != null) { @@ -2753,7 +2756,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * Constructor. */ private ExchangeWorker() { - super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log, + super(cctx.igniteInstanceName(), EXCHANGE_WORKER_THREAD_NAME, GridCachePartitionExchangeManager.this.log, cctx.kernalContext().workersRegistry()); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2a9fb9a..16b7983 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -66,6 +66,7 @@ import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFeatures; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; @@ -2954,7 +2955,34 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati else fut = oldFut; - client = fut.get(); + WorkersRegistry registry = getWorkersRegistry(ignite); + + long clientReserveWaitTimeout = registry != null ? registry.getSystemWorkerBlockedTimeout() / 3 + : connTimeout / 3; + + long currTimeout = System.currentTimeMillis(); + + // This cycle will eventually quit when future is completed by concurrent thread reserving client. + while (true) { + try { + client = fut.get(clientReserveWaitTimeout, TimeUnit.MILLISECONDS); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + currTimeout += clientReserveWaitTimeout; + + if (log.isDebugEnabled()) + log.debug("Still waiting for reestablishing connection to node [nodeId=" + node.id() + ", waitingTime=" + currTimeout + "ms]"); + + if (registry != null) { + GridWorker wrkr = registry.worker(Thread.currentThread().getName()); + + if (wrkr != null) + wrkr.updateHeartbeat(); + } + } + } if (client == null) { if (isLocalNodeDisconnected()) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java index 82522ae..f1d12f2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.IgniteCache; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.cluster.IgniteClusterEx; import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -35,7 +36,7 @@ import org.junit.Test; */ public class IgniteClientFailuresTest extends GridCommonAbstractTest { /** */ - private boolean clientMode; + private static final String EXCHANGE_WORKER_BLOCKED_MSG = "threadName=exchange-worker, blockedFor="; /** */ private GridStringLogger inMemoryLog; @@ -44,13 +45,15 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setClientMode(clientMode); - - if (!clientMode) { + if (igniteInstanceName.contains("client")) + cfg.setClientMode(true); + else { cfg.setClientFailureDetectionTimeout(10_000); cfg.setSystemWorkerBlockedTimeout(5_000); + cfg.setNetworkTimeout(5_000); + cfg.setGridLogger(inMemoryLog); } @@ -77,13 +80,15 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest { */ @Test public void testNoMessagesFromFailureProcessor() throws Exception { - inMemoryLog = new GridStringLogger(false, new GridTestLog4jLogger()); + GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger()); + + strLog.logLength(1024 * 1024); - inMemoryLog.logLength(1024 * 1024); + inMemoryLog = strLog; IgniteEx srv = startGrid(0); - clientMode = true; + inMemoryLog = null; IgniteEx client00 = startGrid("client00"); @@ -99,7 +104,7 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest { assertTrue(waitRes); - assertFalse(inMemoryLog.toString().contains("name=tcp-comm-worker")); + assertFalse(strLog.toString().contains("name=tcp-comm-worker")); } /** @@ -112,12 +117,8 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest { public void testFailedClientLeavesTopologyAfterTimeout() throws Exception { IgniteEx srv0 = startGrid(0); - clientMode = true; - IgniteEx client00 = startGrid("client00"); - Thread.sleep(5_000); - client00.getOrCreateCache(new CacheConfiguration<>("cache0")); breakClient(client00); @@ -138,6 +139,50 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest { assertTrue(waitRes); } + /** + * Test verifies that when some sys thread (on server node) tries to re-establish connection to failed client + * and exchange-worker gets blocked waiting for it (e.g. to send partitions full map) + * it is not treated as {@link FailureType#SYSTEM_WORKER_BLOCKED} + * because this waiting is finite and part of normal operations. + * + * @throws Exception If failed. + */ + @Test + public void testExchangeWorkerIsNotTreatedAsBlockedWhenClientNodeFails() throws Exception { + GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger()); + + strLog.logLength(1024 * 1024); + + inMemoryLog = strLog; + + IgniteEx srv0 = startGrid(0); + + inMemoryLog = null; + + IgniteEx client00 = startGrid("client00"); + + client00.getOrCreateCache(new CacheConfiguration<>("cache0")); + + startGrid(1); + + breakClient(client00); + + final IgniteClusterEx cl = srv0.cluster(); + + assertEquals(3, cl.topology(cl.topologyVersion()).size()); + + startGrid("client01"); + + boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 3), + 20_000); + + assertTrue(waitRes); + + String logRes = strLog.toString(); + + assertFalse(logRes.contains(EXCHANGE_WORKER_BLOCKED_MSG)); + } + /** */ private void checkCacheOperations(IgniteCache cache) { for (int i = 0; i < 100; i++)
