ignite-4154

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8a6117c6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8a6117c6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8a6117c6

Branch: refs/heads/ignite-4154-opt2
Commit: 8a6117c60ea59c9679a9f84598aed363e5cef320
Parents: 83f411f
Author: sboikov <[email protected]>
Authored: Mon Nov 21 12:05:31 2016 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 21 12:05:31 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 25 +++++++++++++-------
 1 file changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8a6117c6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3c5f736..b202cdb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2418,9 +2418,12 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Connection check threshold. */
         private long connCheckThreshold;
 
+        /** */
+        private long lastRingMsgTime;
+
         /**
          */
-        protected RingMessageWorker() {
+        RingMessageWorker() {
             super("tcp-disco-msg-worker", 10);
 
             initConnectionCheckFrequency();
@@ -2515,6 +2518,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to process.
          */
         @Override protected void processMessage(TcpDiscoveryAbstractMessage 
msg) {
+            sendHeartbeatMessage();
+
             DebugLogger log = messageLogger(msg);
 
             if (log.isDebugEnabled())
@@ -2523,6 +2528,11 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (debugMode)
                 debugLog(msg, "Processing message [cls=" + 
msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
+            boolean ensured = spi.ensured(msg);
+
+            if (!locNode.id().equals(msg.senderNodeId()) && ensured)
+                lastRingMsgTime = U.currentTimeMillis();
+
             if (locNode.internalOrder() == 0) {
                 boolean proc = false;
 
@@ -2579,7 +2589,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 assert false : "Unknown message type: " + 
msg.getClass().getSimpleName();
 
-            if (spi.ensured(msg) && redirectToClients(msg))
+            if (ensured && redirectToClients(msg))
                 msgHist.add(msg);
 
             if (msg.senderNodeId() != null && 
!msg.senderNodeId().equals(getLocalNodeId())) {
@@ -2600,8 +2610,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             checkConnection();
 
-            sendHeartbeatMessage();
-
             checkHeartbeatsReceiving();
 
             checkPendingCustomMessages();
@@ -5420,12 +5428,9 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Sends heartbeat message if needed.
          */
         private void sendHeartbeatMessage() {
-            if (!isLocalNodeCoordinator())
-                return;
-
             long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - 
U.currentTimeMillis();
 
-            if (elapsed > 0)
+            if (elapsed > 0 || !isLocalNodeCoordinator())
                 return;
 
             TcpDiscoveryHeartbeatMessage msg = new 
TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
@@ -5445,7 +5450,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
                 lastTimeStatusMsgSent = locNode.lastUpdateTime();
 
-            long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - 
U.currentTimeMillis();
+            long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime);
+
+            long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis();
 
             if (elapsed > 0)
                 return;

Reply via email to