This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 95865b0461 optimize performance. decode in user thread (#11879)
95865b0461 is described below

commit 95865b04616c7d8b4d6b20ac4232b9499006d7eb
Author: icodening <[email protected]>
AuthorDate: Fri Mar 24 11:15:25 2023 +0800

    optimize performance. decode in user thread (#11879)
    
    * optimize performance: decode in user thread
    
    * optimize performance: decode in user thread
    
    * optimize performance: decode in user thread
---
 .../dubbo/common/function/ThrowableSupplier.java   | 27 +++++++++++++++
 .../apache/dubbo/common/utils/ExecutorUtil.java    |  6 ++++
 .../dubbo/rpc/protocol/tri/DeadlineFuture.java     | 11 +++---
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      |  3 ++
 .../dubbo/rpc/protocol/tri/call/ClientCall.java    | 10 ++++--
 .../call/ObserverToClientCallListenerAdapter.java  | 12 ++++---
 .../rpc/protocol/tri/call/TripleClientCall.java    | 21 +++++++-----
 .../protocol/tri/call/TripleMessageProducer.java   | 39 ++++++++++++++++++++++
 .../protocol/tri/call/UnaryClientCallListener.java | 33 ++++++++++--------
 .../dubbo/rpc/protocol/tri/DeadlineFutureTest.java |  2 +-
 10 files changed, 132 insertions(+), 32 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java
new file mode 100644
index 0000000000..84befaf9f7
--- /dev/null
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.function;
+
+public interface ThrowableSupplier<T> {
+
+    /**
+     * Gets a result.
+     *
+     * @return a result
+     */
+    T get() throws Throwable;
+}
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
index 6c0bc148a1..714c8ae4fa 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
@@ -37,6 +37,8 @@ public class ExecutorUtil {
         new LinkedBlockingQueue<Runnable>(100),
         new NamedThreadFactory("Close-ExecutorService-Timer", true));
 
+    private static final Executor DIRECT_EXECUTOR = Runnable::run;
+
     public static boolean isTerminated(Executor executor) {
         if (executor instanceof ExecutorService) {
             if (((ExecutorService) executor).isTerminated()) {
@@ -135,4 +137,8 @@ public class ExecutorUtil {
             future.cancel(true);
         }
     }
+
+    public static Executor directExecutor() {
+        return DIRECT_EXECUTOR;
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
index f2985759fd..34097752a8 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 public class DeadlineFuture extends CompletableFuture<AppResponse> {
 
@@ -80,7 +81,7 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
         return future;
     }
 
-    public void received(TriRpcStatus status, AppResponse appResponse) {
+    public void received(TriRpcStatus status, Supplier<AppResponse> 
appResponse) {
         if (status.code != TriRpcStatus.Code.DEADLINE_EXCEEDED) {
             // decrease Time
             if (!timeoutTask.isCancelled()) {
@@ -88,11 +89,13 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
             }
         }
         if (getExecutor() != null) {
-            getExecutor().execute(() -> doReceived(status, appResponse));
+            getExecutor().execute(() -> doReceived(status, appResponse.get()));
         } else {
-            doReceived(status, appResponse);
+            doReceived(status, appResponse.get());
         }
-    }    private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER 
= new GlobalResourceInitializer<>(
+    }
+
+    private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new 
GlobalResourceInitializer<>(
         () -> new HashedWheelTimer(new 
NamedThreadFactory("dubbo-future-timeout", true), 30,
             TimeUnit.MILLISECONDS), DeadlineFuture::destroy);
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 6c11a1061a..d3e7ec4f48 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.AsyncRpcResult;
@@ -132,6 +133,8 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         try {
             switch (methodDescriptor.getRpcType()) {
                 case UNARY:
+                    call = new TripleClientCall(connectionClient, 
ExecutorUtil.directExecutor(),
+                        getUrl().getOrDefaultFrameworkModel(), writeQueue);
                     result = invokeUnary(methodDescriptor, invocation, call);
                     break;
                 case SERVER_STREAM:
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
index 4936f54409..a54e7b8df1 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
@@ -43,9 +43,9 @@ public interface ClientCall {
         /**
          * Callback when message received.
          *
-         * @param message message received
+         * @param messageProducer message producer
          */
-        void onMessage(Object message);
+        void onMessage(MessageProducer messageProducer);
 
         /**
          * Callback when call is finished.
@@ -110,4 +110,10 @@ public interface ClientCall {
      */
     void setCompression(String compression);
 
+    interface MessageProducer {
+
+        Object getMessage() throws Throwable;
+
+    }
+
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
index 1c934ce970..40125cdb8d 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
@@ -38,10 +38,14 @@ public class ObserverToClientCallListenerAdapter implements 
ClientCall.Listener
     }
 
     @Override
-    public void onMessage(Object message) {
-        delegate.onNext(message);
-        if (call.isAutoRequest()) {
-            call.request(1);
+    public void onMessage(ClientCall.MessageProducer messageProducer) {
+        try {
+            delegate.onNext(messageProducer.getMessage());
+            if (call.isAutoRequest()) {
+                call.request(1);
+            }
+        } catch (Throwable e) {
+            delegate.onError(e);
         }
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index f1d0a60f6a..f54cd4844a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -77,18 +77,23 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
             return;
         }
         try {
-            final Object unpacked = 
requestMetadata.packableMethod.parseResponse(message);
-            listener.onMessage(unpacked);
+            TripleMessageProducer messageProducer = 
TripleMessageProducer.withSupplier(() ->
+                    requestMetadata.packableMethod.parseResponse(message));
+            listener.onMessage(messageProducer);
         } catch (Throwable t) {
-            TriRpcStatus status = 
TriRpcStatus.INTERNAL.withDescription("Deserialize response failed")
-                .withCause(t);
-            cancelByLocal(status.asException());
-            listener.onClose(status,null);
-            LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", 
String.format("Failed to deserialize triple response, service=%s, 
method=%s,connection=%s",
-                    connectionClient, requestMetadata.service, 
requestMetadata.method.getMethodName()), t);
+            onDeserializeError(t);
         }
     }
 
+    private void onDeserializeError(Throwable t){
+        TriRpcStatus status = 
TriRpcStatus.INTERNAL.withDescription("Deserialize response failed")
+            .withCause(t);
+        cancelByLocal(status.asException());
+        listener.onClose(status,null);
+        LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", String.format("Failed 
to deserialize triple response, service=%s, method=%s,connection=%s",
+            connectionClient, requestMetadata.service, 
requestMetadata.method.getMethodName()), t);
+    }
+
     @Override
     public void onCancelByRemote(TriRpcStatus status) {
         if (canceled) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java
new file mode 100644
index 0000000000..e40f03cce9
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.call;
+
+import org.apache.dubbo.common.function.ThrowableSupplier;
+
+
+class TripleMessageProducer implements ClientCall.MessageProducer {
+
+    private final ThrowableSupplier<Object> throwableSupplier;
+
+    private TripleMessageProducer(ThrowableSupplier<Object> throwableSupplier) 
{
+        this.throwableSupplier = throwableSupplier;
+    }
+
+    @Override
+    public Object getMessage() throws Throwable {
+        return throwableSupplier.get();
+    }
+
+    public static TripleMessageProducer withSupplier(ThrowableSupplier<Object> 
supplier) {
+        return new TripleMessageProducer(supplier);
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
index 2f00c15ac1..2829089fa0 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
@@ -26,31 +26,38 @@ import java.util.Map;
 public class UnaryClientCallListener implements ClientCall.Listener {
 
     private final DeadlineFuture future;
-    private Object appResponse;
+    private ClientCall.MessageProducer messageProducer;
 
     public UnaryClientCallListener(DeadlineFuture deadlineFuture) {
         this.future = deadlineFuture;
     }
 
     @Override
-    public void onMessage(Object message) {
-        this.appResponse = message;
+    public void onMessage(ClientCall.MessageProducer messageProducer) {
+        this.messageProducer = messageProducer;
     }
 
     @Override
     public void onClose(TriRpcStatus status, Map<String, Object> trailers) {
-        AppResponse result = new AppResponse();
-        result.setObjectAttachments(trailers);
-        if (status.isOk()) {
-            if (appResponse instanceof Exception) {
-                result.setException((Exception) appResponse);
+        future.received(status, () -> {
+            AppResponse result = new AppResponse();
+            result.setObjectAttachments(trailers);
+            if (status.isOk()) {
+                try {
+                    Object appResponse = messageProducer.getMessage();
+                    if (appResponse instanceof Exception) {
+                        result.setException((Exception) appResponse);
+                    } else {
+                        result.setValue(appResponse);
+                    }
+                } catch (Throwable e) {
+                    result.setException(e);
+                }
             } else {
-                result.setValue(appResponse);
+                result.setException(status.asException());
             }
-         } else {
-            result.setException(status.asException());
-        }
-        future.received(status, result);
+            return result;
+        });
     }
 
     @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
index 09d03513eb..615e25938b 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
@@ -45,7 +45,7 @@ class DeadlineFutureTest {
         DeadlineFuture success = DeadlineFuture.newFuture(service, method, 
address, 1000,
             ImmediateEventExecutor.INSTANCE);
         AppResponse response = new AppResponse();
-        success.received(TriRpcStatus.OK, response);
+        success.received(TriRpcStatus.OK, () -> response);
         AppResponse response1 = success.get();
         Assertions.assertEquals(response, response1);
     }

Reply via email to