This is an automated email from the ASF dual-hosted git repository.

iluo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f86000  Improve/heartbeat (#3276)
0f86000 is described below

commit 0f860003e38896a7f3620bf7a8d525973b40ac9e
Author: xujingfeng <[email protected]>
AuthorDate: Tue Jan 22 11:05:10 2019 +0800

    Improve/heartbeat (#3276)
    
    * add the notice of code style
    
    * modify the pic
    
    * del teh faq.md, move to dubbo admin
    
    * improve:remove the heartbeat on server side
    
    * improve:change the scope of timer to static
---
 .../exchange/support/header/CloseTimerTask.java    | 55 ++++++++++++++++++++++
 .../support/header/HeaderExchangeClient.java       | 40 ++++++----------
 .../support/header/HeaderExchangeServer.java       | 51 +++++++-------------
 .../support/header/ReconnectTimerTask.java         | 33 ++++++-------
 4 files changed, 100 insertions(+), 79 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
new file mode 100644
index 0000000..4081c30
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.exchange.support.header;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.Channel;
+
+/**
+ * CloseTimerTask
+ */
+public class CloseTimerTask extends AbstractTimerTask {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CloseTimerTask.class);
+
+    private final int idleTimeout;
+
+    public CloseTimerTask(ChannelProvider channelProvider, Long 
heartbeatTimeoutTick, int idleTimeout) {
+        super(channelProvider, heartbeatTimeoutTick);
+        this.idleTimeout = idleTimeout;
+    }
+
+    @Override
+    protected void doTask(Channel channel) {
+        try {
+            Long lastRead = lastRead(channel);
+            Long lastWrite = lastWrite(channel);
+            Long now = now();
+            // check ping & pong at server
+            if ((lastRead != null && now - lastRead > idleTimeout)
+                    || (lastWrite != null && now - lastWrite > idleTimeout)) {
+                logger.warn("Close channel " + channel + ", because idleCheck 
timeout: "
+                        + idleTimeout + "ms");
+                channel.close();
+            }
+        } catch (Throwable t) {
+            logger.warn("Exception when close remote channel " + 
channel.getRemoteAddress(), t);
+        }
+    }
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 3abbe5b..a28d53e 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.exchange.support.header;
 import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.utils.Assert;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.Client;
@@ -39,32 +40,27 @@ public class HeaderExchangeClient implements ExchangeClient 
{
 
     private final Client client;
     private final ExchangeChannel channel;
-    // heartbeat(ms), default value is 0 , won't execute a heartbeat.
     private int heartbeat;
-    private int heartbeatTimeout;
+    private int idleTimeout;
 
-    private HashedWheelTimer heartbeatTimer;
+    private static HashedWheelTimer idleCheckTimer = new HashedWheelTimer(new 
NamedThreadFactory("dubbo-client-idleCheck", true), 1,
+            TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
 
     public HeaderExchangeClient(Client client, boolean needHeartbeat) {
-        if (client == null) {
-            throw new IllegalArgumentException("client == null");
-        }
+        Assert.notNull(client, "Client can't be null");
         this.client = client;
         this.channel = new HeaderExchangeChannel(client);
         String dubbo = 
client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
 
         this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, 
dubbo != null &&
                 dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
-        this.heartbeatTimeout = 
client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
-        if (heartbeatTimeout < heartbeat * 2) {
-            throw new IllegalStateException("heartbeatTimeout < 
heartbeatInterval * 2");
+        this.idleTimeout = 
client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
+        if (idleTimeout < heartbeat * 2) {
+            throw new IllegalStateException("idleTimeout < heartbeatInterval * 
2");
         }
 
         if (needHeartbeat) {
-            long tickDuration = calculateLeastDuration(heartbeat);
-            heartbeatTimer = new HashedWheelTimer(new 
NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,
-                    TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
-            startHeartbeatTimer();
+            startIdleCheckTask();
         }
     }
 
@@ -178,28 +174,20 @@ public class HeaderExchangeClient implements 
ExchangeClient {
         return channel.hasAttribute(key);
     }
 
-    private void startHeartbeatTimer() {
+    private void startIdleCheckTask() {
         AbstractTimerTask.ChannelProvider cp = () -> 
Collections.singletonList(HeaderExchangeClient.this);
 
         long heartbeatTick = calculateLeastDuration(heartbeat);
-        long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
+        long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
         HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, 
heartbeatTick, heartbeat);
-        ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, 
heartbeatTimeoutTick, heartbeatTimeout);
+        ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, 
heartbeatTimeoutTick, idleTimeout);
 
         // init task and start timer.
-        heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, 
TimeUnit.MILLISECONDS);
-        heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, 
TimeUnit.MILLISECONDS);
-    }
-
-    private void stopHeartbeatTimer() {
-        if (heartbeatTimer != null) {
-            heartbeatTimer.stop();
-            heartbeatTimer = null;
-        }
+        idleCheckTimer.newTimeout(heartBeatTimerTask, heartbeatTick, 
TimeUnit.MILLISECONDS);
+        idleCheckTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, 
TimeUnit.MILLISECONDS);
     }
 
     private void doClose() {
-        stopHeartbeatTimer();
     }
 
     /**
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
index e339c15..4609d2a 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.utils.Assert;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.remoting.Channel;
@@ -48,25 +49,23 @@ public class HeaderExchangeServer implements ExchangeServer 
{
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
     private final Server server;
-    // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
     private int heartbeat;
-    private int heartbeatTimeout;
+    private int idleTimeout;
     private AtomicBoolean closed = new AtomicBoolean(false);
 
-    private HashedWheelTimer heartbeatTimer;
+    private static HashedWheelTimer idleCheckTimer = new HashedWheelTimer(new 
NamedThreadFactory("dubbo-server-idleCheck", true), 1,
+            TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
 
     public HeaderExchangeServer(Server server) {
-        if (server == null) {
-            throw new IllegalArgumentException("server == null");
-        }
+        Assert.notNull(server, "server == null");
         this.server = server;
         this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 
0);
-        this.heartbeatTimeout = 
server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
-        if (heartbeatTimeout < heartbeat * 2) {
-            throw new IllegalStateException("heartbeatTimeout < 
heartbeatInterval * 2");
+        this.idleTimeout = 
server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
+        if (idleTimeout < heartbeat * 2) {
+            throw new IllegalStateException("idleTimeout < heartbeatInterval * 
2");
         }
 
-        startHeartbeatTimer();
+        startIdleCheckTask();
     }
 
     public Server getServer() {
@@ -149,7 +148,6 @@ public class HeaderExchangeServer implements ExchangeServer 
{
         if (!closed.compareAndSet(false, true)) {
             return;
         }
-        stopHeartbeatTimer();
     }
 
     @Override
@@ -210,14 +208,13 @@ public class HeaderExchangeServer implements 
ExchangeServer {
                 int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
                 int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 
3);
                 if (t < h * 2) {
-                    throw new IllegalStateException("heartbeatTimeout < 
heartbeatInterval * 2");
+                    throw new IllegalStateException("idleTimeout < 
heartbeatInterval * 2");
                 }
-                if (h != heartbeat || t != heartbeatTimeout) {
+                if (h != heartbeat || t != idleTimeout) {
                     heartbeat = h;
-                    heartbeatTimeout = t;
+                    idleTimeout = t;
 
-                    stopHeartbeatTimer();
-                    startHeartbeatTimer();
+                    startIdleCheckTask();
                 }
             }
         } catch (Throwable t) {
@@ -260,28 +257,14 @@ public class HeaderExchangeServer implements 
ExchangeServer {
         }
     }
 
-    private void startHeartbeatTimer() {
-        long tickDuration = calculateLeastDuration(heartbeat);
-        heartbeatTimer = new HashedWheelTimer(new 
NamedThreadFactory("dubbo-server-heartbeat", true), tickDuration,
-                TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
-
+    private void startIdleCheckTask() {
         AbstractTimerTask.ChannelProvider cp = () -> 
unmodifiableCollection(HeaderExchangeServer.this.getChannels());
 
-        long heartbeatTick = calculateLeastDuration(heartbeat);
-        long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
-        HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, 
heartbeatTick, heartbeat);
-        ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, 
heartbeatTimeoutTick, heartbeatTimeout);
+        long idleTimeoutTick = calculateLeastDuration(idleTimeout);
+        CloseTimerTask closeTimerTask = new CloseTimerTask(cp, 
idleTimeoutTick, idleTimeout);
 
         // init task and start timer.
-        heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, 
TimeUnit.MILLISECONDS);
-        heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, 
TimeUnit.MILLISECONDS);
-    }
-
-    private void stopHeartbeatTimer() {
-        if (heartbeatTimer != null) {
-            heartbeatTimer.stop();
-            heartbeatTimer = null;
-        }
+        idleCheckTimer.newTimeout(closeTimerTask, idleTimeoutTick, 
TimeUnit.MILLISECONDS);
     }
 
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
index 2b7dca5..3c0e938 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
@@ -29,35 +29,30 @@ public class ReconnectTimerTask extends AbstractTimerTask {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ReconnectTimerTask.class);
 
-    private final int heartbeatTimeout;
+    private final int idleTimeout;
 
-    ReconnectTimerTask(ChannelProvider channelProvider, Long 
heartbeatTimeoutTick, int heartbeatTimeout1) {
+    public ReconnectTimerTask(ChannelProvider channelProvider, Long 
heartbeatTimeoutTick, int idleTimeout) {
         super(channelProvider, heartbeatTimeoutTick);
-        this.heartbeatTimeout = heartbeatTimeout1;
+        this.idleTimeout = idleTimeout;
     }
 
     @Override
     protected void doTask(Channel channel) {
-        Long lastRead = lastRead(channel);
-        Long now = now();
-        if (lastRead != null && now - lastRead > heartbeatTimeout) {
-            if (channel instanceof Client) {
+        try {
+            Long lastRead = lastRead(channel);
+            Long now = now();
+            // check pong at client
+            if (lastRead != null && now - lastRead > idleTimeout) {
+                logger.warn("Close channel " + channel + ", because heartbeat 
read idle time out: "
+                        + idleTimeout + "ms");
                 try {
-                    logger.warn("Reconnect to remote channel " + 
channel.getRemoteAddress() + ", because heartbeat read idle time out: "
-                            + heartbeatTimeout + "ms");
                     ((Client) channel).reconnect();
-                } catch (Throwable t) {
-                    // do nothing
-                }
-            } else {
-                try {
-                    logger.warn("Close channel " + channel + ", because 
heartbeat read idle time out: "
-                            + heartbeatTimeout + "ms");
-                    channel.close();
-                } catch (Throwable t) {
-                    logger.warn("Exception when close channel " + channel, t);
+                } catch (Exception e) {
+                    logger.error(channel + "reconnect failed during idle 
time.", e);
                 }
             }
+        } catch (Throwable t) {
+            logger.warn("Exception when reconnect to remote channel " + 
channel.getRemoteAddress(), t);
         }
     }
 }

Reply via email to