This is an automated email from the ASF dual-hosted git repository.
iluo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 0f86000 Improve/heartbeat (#3276)
0f86000 is described below
commit 0f860003e38896a7f3620bf7a8d525973b40ac9e
Author: xujingfeng <[email protected]>
AuthorDate: Tue Jan 22 11:05:10 2019 +0800
Improve/heartbeat (#3276)
* add the notice of code style
* modify the pic
* del teh faq.md, move to dubbo admin
* improve:remove the heartbeat on server side
* improve:change the scope of timer to static
---
.../exchange/support/header/CloseTimerTask.java | 55 ++++++++++++++++++++++
.../support/header/HeaderExchangeClient.java | 40 ++++++----------
.../support/header/HeaderExchangeServer.java | 51 +++++++-------------
.../support/header/ReconnectTimerTask.java | 33 ++++++-------
4 files changed, 100 insertions(+), 79 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
new file mode 100644
index 0000000..4081c30
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.exchange.support.header;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.Channel;
+
+/**
+ * CloseTimerTask
+ */
+public class CloseTimerTask extends AbstractTimerTask {
+
+ private static final Logger logger =
LoggerFactory.getLogger(CloseTimerTask.class);
+
+ private final int idleTimeout;
+
+ public CloseTimerTask(ChannelProvider channelProvider, Long
heartbeatTimeoutTick, int idleTimeout) {
+ super(channelProvider, heartbeatTimeoutTick);
+ this.idleTimeout = idleTimeout;
+ }
+
+ @Override
+ protected void doTask(Channel channel) {
+ try {
+ Long lastRead = lastRead(channel);
+ Long lastWrite = lastWrite(channel);
+ Long now = now();
+ // check ping & pong at server
+ if ((lastRead != null && now - lastRead > idleTimeout)
+ || (lastWrite != null && now - lastWrite > idleTimeout)) {
+ logger.warn("Close channel " + channel + ", because idleCheck
timeout: "
+ + idleTimeout + "ms");
+ channel.close();
+ }
+ } catch (Throwable t) {
+ logger.warn("Exception when close remote channel " +
channel.getRemoteAddress(), t);
+ }
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 3abbe5b..a28d53e 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.exchange.support.header;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Client;
@@ -39,32 +40,27 @@ public class HeaderExchangeClient implements ExchangeClient
{
private final Client client;
private final ExchangeChannel channel;
- // heartbeat(ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
- private int heartbeatTimeout;
+ private int idleTimeout;
- private HashedWheelTimer heartbeatTimer;
+ private static HashedWheelTimer idleCheckTimer = new HashedWheelTimer(new
NamedThreadFactory("dubbo-client-idleCheck", true), 1,
+ TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
- if (client == null) {
- throw new IllegalArgumentException("client == null");
- }
+ Assert.notNull(client, "Client can't be null");
this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo =
client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY,
dubbo != null &&
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
- this.heartbeatTimeout =
client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
- if (heartbeatTimeout < heartbeat * 2) {
- throw new IllegalStateException("heartbeatTimeout <
heartbeatInterval * 2");
+ this.idleTimeout =
client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
+ if (idleTimeout < heartbeat * 2) {
+ throw new IllegalStateException("idleTimeout < heartbeatInterval *
2");
}
if (needHeartbeat) {
- long tickDuration = calculateLeastDuration(heartbeat);
- heartbeatTimer = new HashedWheelTimer(new
NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,
- TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
- startHeartbeatTimer();
+ startIdleCheckTask();
}
}
@@ -178,28 +174,20 @@ public class HeaderExchangeClient implements
ExchangeClient {
return channel.hasAttribute(key);
}
- private void startHeartbeatTimer() {
+ private void startIdleCheckTask() {
AbstractTimerTask.ChannelProvider cp = () ->
Collections.singletonList(HeaderExchangeClient.this);
long heartbeatTick = calculateLeastDuration(heartbeat);
- long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
+ long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp,
heartbeatTick, heartbeat);
- ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp,
heartbeatTimeoutTick, heartbeatTimeout);
+ ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp,
heartbeatTimeoutTick, idleTimeout);
// init task and start timer.
- heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick,
TimeUnit.MILLISECONDS);
- heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick,
TimeUnit.MILLISECONDS);
- }
-
- private void stopHeartbeatTimer() {
- if (heartbeatTimer != null) {
- heartbeatTimer.stop();
- heartbeatTimer = null;
- }
+ idleCheckTimer.newTimeout(heartBeatTimerTask, heartbeatTick,
TimeUnit.MILLISECONDS);
+ idleCheckTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick,
TimeUnit.MILLISECONDS);
}
private void doClose() {
- stopHeartbeatTimer();
}
/**
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
index e339c15..4609d2a 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.Channel;
@@ -48,25 +49,23 @@ public class HeaderExchangeServer implements ExchangeServer
{
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Server server;
- // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
- private int heartbeatTimeout;
+ private int idleTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);
- private HashedWheelTimer heartbeatTimer;
+ private static HashedWheelTimer idleCheckTimer = new HashedWheelTimer(new
NamedThreadFactory("dubbo-server-idleCheck", true), 1,
+ TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
public HeaderExchangeServer(Server server) {
- if (server == null) {
- throw new IllegalArgumentException("server == null");
- }
+ Assert.notNull(server, "server == null");
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY,
0);
- this.heartbeatTimeout =
server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
- if (heartbeatTimeout < heartbeat * 2) {
- throw new IllegalStateException("heartbeatTimeout <
heartbeatInterval * 2");
+ this.idleTimeout =
server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
+ if (idleTimeout < heartbeat * 2) {
+ throw new IllegalStateException("idleTimeout < heartbeatInterval *
2");
}
- startHeartbeatTimer();
+ startIdleCheckTask();
}
public Server getServer() {
@@ -149,7 +148,6 @@ public class HeaderExchangeServer implements ExchangeServer
{
if (!closed.compareAndSet(false, true)) {
return;
}
- stopHeartbeatTimer();
}
@Override
@@ -210,14 +208,13 @@ public class HeaderExchangeServer implements
ExchangeServer {
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h *
3);
if (t < h * 2) {
- throw new IllegalStateException("heartbeatTimeout <
heartbeatInterval * 2");
+ throw new IllegalStateException("idleTimeout <
heartbeatInterval * 2");
}
- if (h != heartbeat || t != heartbeatTimeout) {
+ if (h != heartbeat || t != idleTimeout) {
heartbeat = h;
- heartbeatTimeout = t;
+ idleTimeout = t;
- stopHeartbeatTimer();
- startHeartbeatTimer();
+ startIdleCheckTask();
}
}
} catch (Throwable t) {
@@ -260,28 +257,14 @@ public class HeaderExchangeServer implements
ExchangeServer {
}
}
- private void startHeartbeatTimer() {
- long tickDuration = calculateLeastDuration(heartbeat);
- heartbeatTimer = new HashedWheelTimer(new
NamedThreadFactory("dubbo-server-heartbeat", true), tickDuration,
- TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
-
+ private void startIdleCheckTask() {
AbstractTimerTask.ChannelProvider cp = () ->
unmodifiableCollection(HeaderExchangeServer.this.getChannels());
- long heartbeatTick = calculateLeastDuration(heartbeat);
- long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
- HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp,
heartbeatTick, heartbeat);
- ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp,
heartbeatTimeoutTick, heartbeatTimeout);
+ long idleTimeoutTick = calculateLeastDuration(idleTimeout);
+ CloseTimerTask closeTimerTask = new CloseTimerTask(cp,
idleTimeoutTick, idleTimeout);
// init task and start timer.
- heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick,
TimeUnit.MILLISECONDS);
- heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick,
TimeUnit.MILLISECONDS);
- }
-
- private void stopHeartbeatTimer() {
- if (heartbeatTimer != null) {
- heartbeatTimer.stop();
- heartbeatTimer = null;
- }
+ idleCheckTimer.newTimeout(closeTimerTask, idleTimeoutTick,
TimeUnit.MILLISECONDS);
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
index 2b7dca5..3c0e938 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
@@ -29,35 +29,30 @@ public class ReconnectTimerTask extends AbstractTimerTask {
private static final Logger logger =
LoggerFactory.getLogger(ReconnectTimerTask.class);
- private final int heartbeatTimeout;
+ private final int idleTimeout;
- ReconnectTimerTask(ChannelProvider channelProvider, Long
heartbeatTimeoutTick, int heartbeatTimeout1) {
+ public ReconnectTimerTask(ChannelProvider channelProvider, Long
heartbeatTimeoutTick, int idleTimeout) {
super(channelProvider, heartbeatTimeoutTick);
- this.heartbeatTimeout = heartbeatTimeout1;
+ this.idleTimeout = idleTimeout;
}
@Override
protected void doTask(Channel channel) {
- Long lastRead = lastRead(channel);
- Long now = now();
- if (lastRead != null && now - lastRead > heartbeatTimeout) {
- if (channel instanceof Client) {
+ try {
+ Long lastRead = lastRead(channel);
+ Long now = now();
+ // check pong at client
+ if (lastRead != null && now - lastRead > idleTimeout) {
+ logger.warn("Close channel " + channel + ", because heartbeat
read idle time out: "
+ + idleTimeout + "ms");
try {
- logger.warn("Reconnect to remote channel " +
channel.getRemoteAddress() + ", because heartbeat read idle time out: "
- + heartbeatTimeout + "ms");
((Client) channel).reconnect();
- } catch (Throwable t) {
- // do nothing
- }
- } else {
- try {
- logger.warn("Close channel " + channel + ", because
heartbeat read idle time out: "
- + heartbeatTimeout + "ms");
- channel.close();
- } catch (Throwable t) {
- logger.warn("Exception when close channel " + channel, t);
+ } catch (Exception e) {
+ logger.error(channel + "reconnect failed during idle
time.", e);
}
}
+ } catch (Throwable t) {
+ logger.warn("Exception when reconnect to remote channel " +
channel.getRemoteAddress(), t);
}
}
}