ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8a6117c6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8a6117c6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8a6117c6 Branch: refs/heads/ignite-4154-opt2 Commit: 8a6117c60ea59c9679a9f84598aed363e5cef320 Parents: 83f411f Author: sboikov <[email protected]> Authored: Mon Nov 21 12:05:31 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 21 12:05:31 2016 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 25 +++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8a6117c6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 3c5f736..b202cdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2418,9 +2418,12 @@ class ServerImpl extends TcpDiscoveryImpl { /** Connection check threshold. */ private long connCheckThreshold; + /** */ + private long lastRingMsgTime; + /** */ - protected RingMessageWorker() { + RingMessageWorker() { super("tcp-disco-msg-worker", 10); initConnectionCheckFrequency(); @@ -2515,6 +2518,8 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to process. */ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + sendHeartbeatMessage(); + DebugLogger log = messageLogger(msg); if (log.isDebugEnabled()) @@ -2523,6 +2528,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); + boolean ensured = spi.ensured(msg); + + if (!locNode.id().equals(msg.senderNodeId()) && ensured) + lastRingMsgTime = U.currentTimeMillis(); + if (locNode.internalOrder() == 0) { boolean proc = false; @@ -2579,7 +2589,7 @@ class ServerImpl extends TcpDiscoveryImpl { else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); - if (spi.ensured(msg) && redirectToClients(msg)) + if (ensured && redirectToClients(msg)) msgHist.add(msg); if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) { @@ -2600,8 +2610,6 @@ class ServerImpl extends TcpDiscoveryImpl { checkConnection(); - sendHeartbeatMessage(); - checkHeartbeatsReceiving(); checkPendingCustomMessages(); @@ -5420,12 +5428,9 @@ class ServerImpl extends TcpDiscoveryImpl { * Sends heartbeat message if needed. */ private void sendHeartbeatMessage() { - if (!isLocalNodeCoordinator()) - return; - long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis(); - if (elapsed > 0) + if (elapsed > 0 || !isLocalNodeCoordinator()) return; TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId()); @@ -5445,7 +5450,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (lastTimeStatusMsgSent < locNode.lastUpdateTime()) lastTimeStatusMsgSent = locNode.lastUpdateTime(); - long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis(); + long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime); + + long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis(); if (elapsed > 0) return;
