This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 3ec4f889f5 refactor: inspect code of exchange classes. (#10332)
3ec4f889f5 is described below

commit 3ec4f889f55445948fb11231092f3b763a26b4f4
Author: stone lion <[email protected]>
AuthorDate: Sun Jul 17 21:02:23 2022 +0800

    refactor: inspect code of exchange classes. (#10332)
---
 .../apache/dubbo/remoting/exchange/Request.java    | 14 +++++------
 .../remoting/exchange/support/DefaultFuture.java   | 25 +++++++++++++-------
 .../exchange/support/header/CloseTimerTask.java    |  3 +--
 .../support/header/HeaderExchangeServer.java       | 27 ++++++----------------
 4 files changed, 31 insertions(+), 38 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
index f115de2141..d75a21a323 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
@@ -58,14 +58,12 @@ public class Request {
         if (data == null) {
             return null;
         }
-        String dataStr;
+
         try {
-            dataStr = data.toString();
+            return data.toString();
         } catch (Throwable e) {
-            dataStr = "<Fail toString of " + data.getClass() + ", cause: " +
-                    StringUtils.toString(e) + ">";
+            return "<Fail toString of " + data.getClass() + ", cause: " + 
StringUtils.toString(e) + ">";
         }
-        return dataStr;
     }
 
     public long getId() {
@@ -137,7 +135,7 @@ public class Request {
         return copy;
     }
 
-    public Request copyWithoutData(){
+    public Request copyWithoutData() {
         Request copy = new Request(mId);
         copy.mVersion = this.mVersion;
         copy.mTwoWay = this.mTwoWay;
@@ -148,7 +146,7 @@ public class Request {
 
     @Override
     public String toString() {
-        return "Request [id=" + mId + ", version=" + mVersion + ", twoway=" + 
mTwoWay + ", event=" + mEvent
-                + ", broken=" + mBroken + ", data=" + (mData == this ? "this" 
: safeToString(mData)) + "]";
+        return "Request [id=" + mId + ", version=" + mVersion + ", twoWay=" + 
mTwoWay + ", event=" + mEvent
+            + ", broken=" + mBroken + ", data=" + (mData == this ? "this" : 
safeToString(mData)) + "]";
     }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index f1f0ed19fc..d2edbbe15d 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -49,21 +49,31 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(DefaultFuture.class);
 
+    /**
+     * in-flight channels
+     */
     private static final Map<Long, Channel> CHANNELS = new 
ConcurrentHashMap<>();
 
+    /**
+     * in-flight requests
+     */
     private static final Map<Long, DefaultFuture> FUTURES = new 
ConcurrentHashMap<>();
 
-    private static GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new 
GlobalResourceInitializer<>(() -> new HashedWheelTimer(
-        new NamedThreadFactory("dubbo-future-timeout", true), 30, 
TimeUnit.MILLISECONDS),
-        () -> destroy());
+    private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new 
GlobalResourceInitializer<>(() -> new HashedWheelTimer(new 
NamedThreadFactory("dubbo-future-timeout", true), 30, TimeUnit.MILLISECONDS), 
DefaultFuture::destroy);
 
     // invoke id.
     private final Long id;
+
     private final Channel channel;
+
     private final Request request;
+
     private final int timeout;
+
     private final long start = System.currentTimeMillis();
+
     private volatile long sent;
+
     private Timeout timeoutCheckTask;
 
     private ExecutorService executor;
@@ -95,7 +105,7 @@ public class DefaultFuture extends CompletableFuture<Object> 
{
     }
 
     public static void destroy() {
-        TIME_OUT_TIMER.remove(timer-> timer.stop());
+        TIME_OUT_TIMER.remove(Timer::stop);
         FUTURES.clear();
         CHANNELS.clear();
     }
@@ -151,9 +161,8 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
                     Response disconnectResponse = new Response(future.getId());
                     disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
                     disconnectResponse.setErrorMessage("Channel " +
-                            channel +
-                            " is inactive. Directly return the unFinished 
request : " +
-                            (logger.isDebugEnabled() ? future.getRequest() : 
future.getRequest().copyWithoutData()));
+                        channel + " is inactive. Directly return the 
unFinished request : " +
+                        (logger.isDebugEnabled() ? future.getRequest() : 
future.getRequest().copyWithoutData()));
                     DefaultFuture.received(channel, disconnectResponse);
                 }
             }
@@ -214,7 +223,7 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
             this.completeExceptionally(new RemotingException(channel, 
res.getErrorMessage()));
         }
 
-        // the result is returning, but the caller thread may still waiting
+        // the result is returning, but the caller thread may still wait
         // to avoid endless waiting for whatever reason, notify caller thread 
to return.
         if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
index 4081c309d0..c4c8dc7531 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/CloseTimerTask.java
@@ -44,8 +44,7 @@ public class CloseTimerTask extends AbstractTimerTask {
             // check ping & pong at server
             if ((lastRead != null && now - lastRead > idleTimeout)
                     || (lastWrite != null && now - lastWrite > idleTimeout)) {
-                logger.warn("Close channel " + channel + ", because idleCheck 
timeout: "
-                        + idleTimeout + "ms");
+                logger.warn("Close channel " + channel + ", because idleCheck 
timeout: " + idleTimeout + "ms");
                 channel.close();
             }
         } catch (Throwable t) {
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
index 49683b8320..37214cf29f 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
@@ -58,12 +58,10 @@ public class HeaderExchangeServer implements ExchangeServer 
{
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
     private final RemotingServer server;
-    private AtomicBoolean closed = new AtomicBoolean(false);
 
-    public static GlobalResourceInitializer<HashedWheelTimer> IDLE_CHECK_TIMER 
= new GlobalResourceInitializer<>(() ->
-        new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", 
true), 1,
-            TimeUnit.SECONDS, TICKS_PER_WHEEL),
-        timer -> timer.stop());
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public static GlobalResourceInitializer<HashedWheelTimer> IDLE_CHECK_TIMER 
= new GlobalResourceInitializer<>(() -> new HashedWheelTimer(new 
NamedThreadFactory("dubbo-server-idleCheck", true), 1, TimeUnit.SECONDS, 
TICKS_PER_WHEEL), HashedWheelTimer::stop);
 
     private Timeout closeTimer;
 
@@ -83,19 +81,9 @@ public class HeaderExchangeServer implements ExchangeServer {
     }
 
     private boolean isRunning() {
-        Collection<Channel> channels = getChannels();
-        for (Channel channel : channels) {
-
-            /**
-             *  If there are any client connections,
-             *  our server should be running.
-             */
-
-            if (channel.isConnected()) {
-                return true;
-            }
-        }
-        return false;
+        // If there are any client connections,
+        // our server should be running.
+        return getChannels().stream().anyMatch(Channel::isConnected);
     }
 
     @Override
@@ -114,13 +102,12 @@ public class HeaderExchangeServer implements 
ExchangeServer {
         }
         startClose();
         if (timeout > 0) {
-            final long max = timeout;
             final long start = System.currentTimeMillis();
             if 
(getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
                 sendChannelReadOnlyEvent();
             }
             while (HeaderExchangeServer.this.isRunning()
-                    && System.currentTimeMillis() - start < max) {
+                && System.currentTimeMillis() - start < (long) timeout) {
                 try {
                     Thread.sleep(10);
                 } catch (InterruptedException e) {

Reply via email to