This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 03ee856 IGNITE-13016 : Timeouts and performance of backward checking of failed nodes is fixed. - Fixes #7838. 03ee856 is described below commit 03ee85695014ff6aaa87e256d330d32342d34224 Author: Vladimir Steshin <vlads...@gmail.com> AuthorDate: Tue Jul 21 13:24:25 2020 +0300 IGNITE-13016 : Timeouts and performance of backward checking of failed nodes is fixed. - Fixes #7838. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../ignite/spi/discovery/tcp/ServerImpl.java | 97 +++++++++++++++------- .../GridFailFastNodeFailureDetectionSelfTest.java | 2 +- 2 files changed, 68 insertions(+), 31 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index a43d8b1..06f8b75 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -54,9 +54,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLException; import javax.net.ssl.SSLServerSocket; @@ -384,17 +386,14 @@ class ServerImpl extends TcpDiscoveryImpl { lastRingMsgSentTime = 0; - long msgExchangeTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : - spi.getSocketTimeout() + spi.getAckTimeout(); - // Since we take in account time of last sent message, the interval should be quite short to give enough piece // of failure detection timeout as send-and-acknowledge timeout of the message to send. - connCheckInterval = Math.min(msgExchangeTimeout / 4, MAX_CON_CHECK_INTERVAL); + connCheckInterval = Math.min(effectiveExchangeTimeout() / 4, MAX_CON_CHECK_INTERVAL); utilityPool = new IgniteThreadPoolExecutor("disco-pool", spi.ignite().name(), 0, - 1, + 4, 2000, new LinkedBlockingQueue<>()); @@ -2009,6 +2008,12 @@ class ServerImpl extends TcpDiscoveryImpl { return threads; } + /** @return Complete timeout of single message exchange operation on established connection. */ + protected long effectiveExchangeTimeout() { + return spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + spi.getSocketTimeout() + spi.getAckTimeout(); + } + /** {@inheritDoc} */ @Override public void updateMetrics(UUID nodeId, ClusterMetrics metrics, @@ -6873,9 +6878,10 @@ class ServerImpl extends TcpDiscoveryImpl { // Need to check connectivity to it. long rcvdTime = lastRingMsgReceivedTime; long now = System.nanoTime(); + long timeThreshold = rcvdTime + U.millisToNanos(effectiveExchangeTimeout()); - // We got message from previous in less than double connection check interval. - boolean ok = rcvdTime + U.millisToNanos(connCheckInterval) * 2 >= now; + // We got message from previous in less than effective exchange timeout. + boolean ok = timeThreshold > now; TcpDiscoveryNode previous = null; if (ok) { @@ -6893,18 +6899,10 @@ class ServerImpl extends TcpDiscoveryImpl { if (previous != null && !previous.id().equals(nodeId) && (req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) { - Collection<InetSocketAddress> nodeAddrs = - spi.getNodeAddresses(previous, false); + Collection<InetSocketAddress> nodeAddrs = spi.getNodeAddresses(previous, false); - for (InetSocketAddress addr : nodeAddrs) { - // Connection refused may be got if node doesn't listen - // (or blocked by firewall, but anyway assume it is dead). - if (!isConnectionRefused(addr)) { - liveAddr = addr; - - break; - } - } + liveAddr = checkConnection(new ArrayList<>(nodeAddrs), + (int)U.nanosToMillis(timeThreshold - now)); if (log.isInfoEnabled()) log.info("Connection check done [liveAddr=" + liveAddr @@ -7367,22 +7365,61 @@ class ServerImpl extends TcpDiscoveryImpl { lastRingMsgReceivedTime = System.nanoTime(); } - /** - * @param addr Address to check. - * @return {@code True} if got connection refused on connect try. - */ - private boolean isConnectionRefused(SocketAddress addr) { - try (Socket sock = new Socket()) { - sock.connect(addr, 100); + /** @return Alive address if was able to connected to. {@code Null} otherwise. */ + private InetSocketAddress checkConnection(List<InetSocketAddress> addrs, int timeout) { + AtomicReference<InetSocketAddress> liveAddrHolder = new AtomicReference<>(); + + CountDownLatch latch = new CountDownLatch(addrs.size()); + + int addrLeft = addrs.size(); + + int threadsLeft = utilityPool.getMaximumPoolSize(); + + AtomicInteger addrIdx = new AtomicInteger(); + + while (addrLeft > 0) { + int addrPerThread = addrLeft / threadsLeft + (addrLeft % threadsLeft > 0 ? 1 : 0); + + addrLeft -= addrPerThread; + + --threadsLeft; + + utilityPool.execute(new Thread() { + private final int addrsToCheck = addrPerThread; + + /** */ + @Override public void run() { + int perAddrTimeout = timeout / addrsToCheck; + + for (int i = 0; i < addrsToCheck; ++i) { + InetSocketAddress addr = addrs.get(addrIdx.getAndIncrement()); + + try (Socket sock = new Socket()) { + if (liveAddrHolder.get() == null) { + sock.connect(addr, perAddrTimeout); + + liveAddrHolder.compareAndSet(null, addr); + } + } + catch (Exception ignored) { + // No-op. + } + finally { + latch.countDown(); + } + } + } + }); } - catch (ConnectException e) { - return true; + + try { + latch.await(timeout, TimeUnit.MILLISECONDS); } - catch (IOException e) { - return false; + catch (InterruptedException ignored) { + // No-op. } - return false; + return liveAddrHolder.get(); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java index 73cb5f7..d727550 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java @@ -107,7 +107,7 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract failNode(ignite1); - assert failLatch.await(1500, MILLISECONDS); + assert failLatch.await(ignite1.configuration().getFailureDetectionTimeout(), MILLISECONDS); } /**