This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 3ec4f889f5 refactor: inspect code of exchange classes. (#10332)
3ec4f889f5 is described below
commit 3ec4f889f55445948fb11231092f3b763a26b4f4
Author: stone lion <[email protected]>
AuthorDate: Sun Jul 17 21:02:23 2022 +0800
refactor: inspect code of exchange classes. (#10332)
---
.../apache/dubbo/remoting/exchange/Request.java | 14 +++++------
.../remoting/exchange/support/DefaultFuture.java | 25 +++++++++++++-------
.../exchange/support/header/CloseTimerTask.java | 3 +--
.../support/header/HeaderExchangeServer.java | 27 ++++++----------------
4 files changed, 31 insertions(+), 38 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
index f115de2141..d75a21a323 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
@@ -58,14 +58,12 @@ public class Request {
if (data == null) {
return null;
}
- String dataStr;
+
try {
- dataStr = data.toString();
+ return data.toString();
} catch (Throwable e) {
- dataStr = "<Fail toString of " + data.getClass() + ", cause: " +
- StringUtils.toString(e) + ">";
+ return "<Fail toString of " + data.getClass() + ", cause: " +
StringUtils.toString(e) + ">";
}
- return dataStr;
}
public long getId() {
@@ -137,7 +135,7 @@ public class Request {
return copy;
}
- public Request copyWithoutData(){
+ public Request copyWithoutData() {
Request copy = new Request(mId);
copy.mVersion = this.mVersion;
copy.mTwoWay = this.mTwoWay;
@@ -148,7 +146,7 @@ public class Request {
@Override
public String toString() {
- return "Request [id=" + mId + ", version=" + mVersion + ", twoway=" +
mTwoWay + ", event=" + mEvent
- + ", broken=" + mBroken + ", data=" + (mData == this ? "this"
: safeToString(mData)) + "]";
+ return "Request [id=" + mId + ", version=" + mVersion + ", twoWay=" +
mTwoWay + ", event=" + mEvent
+ + ", broken=" + mBroken + ", data=" + (mData == this ? "this" :
safeToString(mData)) + "]";
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index f1f0ed19fc..d2edbbe15d 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -49,21 +49,31 @@ public class DefaultFuture extends
CompletableFuture<Object> {
private static final Logger logger =
LoggerFactory.getLogger(DefaultFuture.class);
+ /**
+ * in-flight channels
+ */
private static final Map<Long, Channel> CHANNELS = new
ConcurrentHashMap<>();
+ /**
+ * in-flight requests
+ */
private static final Map<Long, DefaultFuture> FUTURES = new
ConcurrentHashMap<>();
- private static GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new
GlobalResourceInitializer<>(() -> new HashedWheelTimer(
- new NamedThreadFactory("dubbo-future-timeout", true), 30,
TimeUnit.MILLISECONDS),
- () -> destroy());
+ private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new
GlobalResourceInitializer<>(() -> new HashedWheelTimer(new
NamedThreadFactory("dubbo-future-timeout", true), 30, TimeUnit.MILLISECONDS),
DefaultFuture::destroy);
// invoke id.
private final Long id;
+
private final Channel channel;
+
private final Request request;
+
private final int timeout;
+
private final long start = System.currentTimeMillis();
+
private volatile long sent;
+
private Timeout timeoutCheckTask;
private ExecutorService executor;
@@ -95,7 +105,7 @@ public class DefaultFuture extends CompletableFuture<Object>
{
}
public static void destroy() {
- TIME_OUT_TIMER.remove(timer-> timer.stop());
+ TIME_OUT_TIMER.remove(Timer::stop);
FUTURES.clear();
CHANNELS.clear();
}
@@ -151,9 +161,8 @@ public class DefaultFuture extends
CompletableFuture<Object> {
Response disconnectResponse = new Response(future.getId());
disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
disconnectResponse.setErrorMessage("Channel " +
- channel +
- " is inactive. Directly return the unFinished
request : " +
- (logger.isDebugEnabled() ? future.getRequest() :
future.getRequest().copyWithoutData()));
+ channel + " is inactive. Directly return the
unFinished request : " +
+ (logger.isDebugEnabled() ? future.getRequest() :
future.getRequest().copyWithoutData()));
DefaultFuture.received(channel, disconnectResponse);
}
}
@@ -214,7 +223,7 @@ public class DefaultFuture extends
CompletableFuture<Object> {
this.completeExceptionally(new RemotingException(channel,
res.getErrorMessage()));
}
- // the result is returning, but the caller thread may still waiting
+ // the result is returning, but the caller thread may still wait
// to avoid endless waiting for whatever reason, notify caller thread
to return.
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
index 4081c309d0..c4c8dc7531 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
@@ -44,8 +44,7 @@ public class CloseTimerTask extends AbstractTimerTask {
// check ping & pong at server
if ((lastRead != null && now - lastRead > idleTimeout)
|| (lastWrite != null && now - lastWrite > idleTimeout)) {
- logger.warn("Close channel " + channel + ", because idleCheck
timeout: "
- + idleTimeout + "ms");
+ logger.warn("Close channel " + channel + ", because idleCheck
timeout: " + idleTimeout + "ms");
channel.close();
}
} catch (Throwable t) {
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
index 49683b8320..37214cf29f 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
@@ -58,12 +58,10 @@ public class HeaderExchangeServer implements ExchangeServer
{
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final RemotingServer server;
- private AtomicBoolean closed = new AtomicBoolean(false);
- public static GlobalResourceInitializer<HashedWheelTimer> IDLE_CHECK_TIMER
= new GlobalResourceInitializer<>(() ->
- new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck",
true), 1,
- TimeUnit.SECONDS, TICKS_PER_WHEEL),
- timer -> timer.stop());
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ public static GlobalResourceInitializer<HashedWheelTimer> IDLE_CHECK_TIMER
= new GlobalResourceInitializer<>(() -> new HashedWheelTimer(new
NamedThreadFactory("dubbo-server-idleCheck", true), 1, TimeUnit.SECONDS,
TICKS_PER_WHEEL), HashedWheelTimer::stop);
private Timeout closeTimer;
@@ -83,19 +81,9 @@ public class HeaderExchangeServer implements ExchangeServer {
}
private boolean isRunning() {
- Collection<Channel> channels = getChannels();
- for (Channel channel : channels) {
-
- /**
- * If there are any client connections,
- * our server should be running.
- */
-
- if (channel.isConnected()) {
- return true;
- }
- }
- return false;
+ // If there are any client connections,
+ // our server should be running.
+ return getChannels().stream().anyMatch(Channel::isConnected);
}
@Override
@@ -114,13 +102,12 @@ public class HeaderExchangeServer implements
ExchangeServer {
}
startClose();
if (timeout > 0) {
- final long max = timeout;
final long start = System.currentTimeMillis();
if
(getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
sendChannelReadOnlyEvent();
}
while (HeaderExchangeServer.this.isRunning()
- && System.currentTimeMillis() - start < max) {
+ && System.currentTimeMillis() - start < (long) timeout) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {