This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 235695ccf8 [Improvement-16870][RPC] Support retry configuration in rpc 
module (#16899)
235695ccf8 is described below

commit 235695ccf84334d55e46b2aa8c4fe5210f11b1fe
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Dec 19 09:24:52 2024 +0800

    [Improvement-16870][RPC] Support retry configuration in rpc module (#16899)
---
 .../dolphinscheduler/extract/base/RpcMethod.java   |   2 +
 ...{RpcMethod.java => RpcMethodRetryStrategy.java} |  19 +++-
 .../extract/base/SyncRequestDto.java               |   4 +
 .../base/client/ClientInvocationHandler.java       |   9 +-
 .../extract/base/client/NettyRemotingClient.java   | 112 ++++++++++++---------
 .../base/client/SyncClientMethodInvoker.java       |   9 +-
 ...tException.java => RemoteTimeoutException.java} |   8 +-
 .../extract/base/exception/RemotingException.java  |  58 -----------
 .../exception/RemotingTooMuchRequestException.java |  28 ------
 .../extract/base/future/ResponseFuture.java        |   1 -
 .../base/metrics/ClientSyncExceptionMetrics.java   |  17 ++--
 .../extract/base/metrics/RpcMetrics.java           |   4 +-
 .../extract/base/protocal/TransporterEncoder.java  |   6 +-
 .../extract/base/client/ClientsTest.java           |  21 +++-
 14 files changed, 137 insertions(+), 161 deletions(-)

diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
index 2dcf689ad8..77c9d707ad 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
@@ -30,4 +30,6 @@ public @interface RpcMethod {
 
     long timeout() default -1;
 
+    RpcMethodRetryStrategy retry() default @RpcMethodRetryStrategy;
+
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethodRetryStrategy.java
similarity index 65%
copy from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
copy to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethodRetryStrategy.java
index 2dcf689ad8..5235ba55f2 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethodRetryStrategy.java
@@ -22,12 +22,27 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.net.ConnectException;
 
 @Target(ElementType.METHOD)
 @Retention(RetentionPolicy.RUNTIME)
 @Documented
-public @interface RpcMethod {
+public @interface RpcMethodRetryStrategy {
 
-    long timeout() default -1;
+    /**
+     * The maximum number of retries. Default is 3, which means that the 
method is retried at most 3 times, including the first call.
+     */
+    int maxRetryTimes() default 3;
+
+    /**
+     * The interval between retries, in milliseconds. If the value is less 
than or equal to 0, no interval is set.
+     */
+    long retryInterval() default 0;
+
+    /**
+     * Which exception to retry.
+     * <p> Default is {@link ConnectException}.
+     */
+    Class<? extends Throwable>[] retryFor() default {ConnectException.class};
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
index 7df649cd57..b9ae76536b 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
@@ -32,7 +32,11 @@ import lombok.NoArgsConstructor;
 public class SyncRequestDto {
 
     private Host serverHost;
+
     private Transporter transporter;
+
     private long timeoutMillis;
 
+    private RpcMethodRetryStrategy retryStrategy;
+
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
index 41ec3e056d..3bf723a577 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
 
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -51,7 +52,13 @@ class ClientInvocationHandler implements InvocationHandler {
         }
         ClientMethodInvoker methodInvoker = methodInvokerMap.computeIfAbsent(
                 method.toGenericString(), m -> new 
SyncClientMethodInvoker(serverHost, method, nettyRemotingClient));
-        return methodInvoker.invoke(proxy, method, args);
+        try {
+            return methodInvoker.invoke(proxy, method, args);
+        } catch (UndeclaredThrowableException undeclaredThrowableException) {
+            throw undeclaredThrowableException.getCause();
+        } catch (Throwable throwable) {
+            throw throwable;
+        }
     }
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
index 4aea4d6dfe..0d1f7a0f21 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
@@ -19,10 +19,11 @@ package org.apache.dolphinscheduler.extract.base.client;
 
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.extract.base.IRpcResponse;
+import org.apache.dolphinscheduler.extract.base.RpcMethodRetryStrategy;
 import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
 import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
-import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
-import 
org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
+import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
+import 
org.apache.dolphinscheduler.extract.base.exception.RemoteTimeoutException;
 import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
 import 
org.apache.dolphinscheduler.extract.base.metrics.ClientSyncDurationMetrics;
 import 
org.apache.dolphinscheduler.extract.base.metrics.ClientSyncExceptionMetrics;
@@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
 import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
@@ -110,59 +112,79 @@ public class NettyRemotingClient implements AutoCloseable 
{
         isStarted.compareAndSet(false, true);
     }
 
-    public IRpcResponse sendSync(SyncRequestDto syncRequestDto) throws 
RemotingException {
-        long start = System.currentTimeMillis();
-
+    public IRpcResponse sendSync(final SyncRequestDto syncRequestDto) throws 
RemoteException {
         final Host host = syncRequestDto.getServerHost();
         final Transporter transporter = syncRequestDto.getTransporter();
-        final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ? 
clientConfig.getConnectTimeoutMillis()
+        final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ? 
clientConfig.getDefaultRpcTimeoutMillis()
                 : syncRequestDto.getTimeoutMillis();
-        final long opaque = transporter.getHeader().getOpaque();
 
-        try {
-            final Channel channel = getOrCreateChannel(host);
-            if (channel == null) {
-                throw new RemotingException(String.format("connect to : %s 
fail", host));
-            }
-            final ResponseFuture responseFuture = new ResponseFuture(opaque, 
timeoutMillis);
-            channel.writeAndFlush(transporter).addListener(future -> {
-                if (future.isSuccess()) {
-                    responseFuture.setSendOk(true);
-                    return;
-                } else {
-                    responseFuture.setSendOk(false);
+        final RpcMethodRetryStrategy retryStrategy = 
syncRequestDto.getRetryStrategy();
+
+        int maxRetryTimes = retryStrategy.maxRetryTimes();
+        int currentExecuteTimes = 1;
+
+        while (true) {
+            final long start = System.currentTimeMillis();
+            try {
+                return doSendSync(transporter, host, timeoutMillis);
+            } catch (Exception ex) {
+                ClientSyncExceptionMetrics clientSyncExceptionMetrics =
+                        ClientSyncExceptionMetrics.of(syncRequestDto, ex);
+                
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
+
+                if (currentExecuteTimes < maxRetryTimes
+                        && Arrays.stream(retryStrategy.retryFor()).anyMatch(e 
-> e.isInstance(ex))) {
+                    currentExecuteTimes++;
+                    if (retryStrategy.retryInterval() > 0) {
+                        ThreadUtils.sleep(retryStrategy.retryInterval());
+                    }
+                    continue;
                 }
-                responseFuture.setCause(future.cause());
-                responseFuture.putResponse(null);
-                log.error("Send Sync request {} to host {} failed", 
transporter, host, responseFuture.getCause());
-            });
-            /*
-             * sync wait for result
-             */
-            IRpcResponse iRpcResponse = responseFuture.waitResponse();
-            if (iRpcResponse == null) {
-                if (responseFuture.isSendOK()) {
-                    throw new RemotingTimeoutException(host.toString(), 
timeoutMillis, responseFuture.getCause());
+
+                if (ex instanceof RemoteException) {
+                    throw (RemoteException) ex;
                 } else {
-                    throw new RemotingException(host.toString(), 
responseFuture.getCause());
+                    throw new RemoteException("Call method to " + host + " 
failed", ex);
                 }
+            } finally {
+                ClientSyncDurationMetrics clientSyncDurationMetrics = 
ClientSyncDurationMetrics
+                        .of(syncRequestDto)
+                        .withMilliseconds(System.currentTimeMillis() - start);
+                
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
             }
-            return iRpcResponse;
-        } catch (Exception ex) {
-            ClientSyncExceptionMetrics clientSyncExceptionMetrics = 
ClientSyncExceptionMetrics
-                    .of(syncRequestDto)
-                    .withThrowable(ex);
-            
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
-            if (ex instanceof RemotingException) {
-                throw (RemotingException) ex;
+        }
+    }
+
+    private IRpcResponse doSendSync(final Transporter transporter,
+                                    final Host serverHost,
+                                    long timeoutMills) throws RemoteException, 
InterruptedException {
+        final Channel channel = getOrCreateChannel(serverHost);
+        if (channel == null) {
+            throw new RemoteException(String.format("connect to : %s fail", 
serverHost));
+        }
+        final ResponseFuture responseFuture = new 
ResponseFuture(transporter.getHeader().getOpaque(), timeoutMills);
+        channel.writeAndFlush(transporter).addListener(future -> {
+            if (future.isSuccess()) {
+                responseFuture.setSendOk(true);
+                return;
             } else {
-                throw new RemotingException(ex);
+                responseFuture.setSendOk(false);
             }
-        } finally {
-            ClientSyncDurationMetrics clientSyncDurationMetrics = 
ClientSyncDurationMetrics
-                    .of(syncRequestDto)
-                    .withMilliseconds(System.currentTimeMillis() - start);
-            
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
+            responseFuture.setCause(future.cause());
+            responseFuture.putResponse(null);
+            log.error("Send Sync request {} to host {} failed", transporter, 
serverHost, responseFuture.getCause());
+        });
+        /*
+         * sync wait for result
+         */
+        final IRpcResponse iRpcResponse = responseFuture.waitResponse();
+        if (iRpcResponse != null) {
+            return iRpcResponse;
+        }
+        if (responseFuture.isSendOK()) {
+            throw new RemoteTimeoutException(serverHost.toString(), 
timeoutMills, responseFuture.getCause());
+        } else {
+            throw new RemoteException(serverHost.toString(), 
responseFuture.getCause());
         }
     }
 
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
index ccbdad945b..5a09af2c56 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
@@ -38,12 +38,13 @@ class SyncClientMethodInvoker extends 
AbstractClientMethodInvoker {
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
         RpcMethod sync = method.getAnnotation(RpcMethod.class);
-        Transporter transporter = new Transporter();
-        
transporter.setBody(JsonSerializer.serialize(StandardRpcRequest.of(args)));
-        transporter.setHeader(TransporterHeader.of(methodIdentifier));
+        final Transporter transporter = Transporter.of(
+                TransporterHeader.of(methodIdentifier),
+                JsonSerializer.serialize(StandardRpcRequest.of(args)));
 
-        SyncRequestDto syncRequestDto = SyncRequestDto.builder()
+        final SyncRequestDto syncRequestDto = SyncRequestDto.builder()
                 .timeoutMillis(sync.timeout())
+                .retryStrategy(sync.retry())
                 .transporter(transporter)
                 .serverHost(serverHost)
                 .build();
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTimeoutException.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemoteTimeoutException.java
similarity index 79%
rename from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTimeoutException.java
rename to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemoteTimeoutException.java
index 22b77bf1f5..abbd374250 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTimeoutException.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemoteTimeoutException.java
@@ -20,17 +20,17 @@ package org.apache.dolphinscheduler.extract.base.exception;
 /**
  *  timeout exception
  */
-public class RemotingTimeoutException extends RemotingException {
+public class RemoteTimeoutException extends RemoteException {
 
-    public RemotingTimeoutException(String message) {
+    public RemoteTimeoutException(String message) {
         super(message);
     }
 
-    public RemotingTimeoutException(String address, long timeoutMillis) {
+    public RemoteTimeoutException(String address, long timeoutMillis) {
         this(address, timeoutMillis, null);
     }
 
-    public RemotingTimeoutException(String address, long timeoutMillis, 
Throwable cause) {
+    public RemoteTimeoutException(String address, long timeoutMillis, 
Throwable cause) {
         super(String.format("wait response on the channel %s timeout %s", 
address, timeoutMillis), cause);
     }
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingException.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingException.java
deleted file mode 100644
index 4290362a01..0000000000
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingException.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.extract.base.exception;
-
-/**
- *  remote exception
- */
-public class RemotingException extends Exception {
-
-    public RemotingException() {
-        super();
-    }
-
-    /**
-     * Construct a new runtime exception with the detail message
-     *
-     * @param   message  detail message
-     */
-    public RemotingException(String message) {
-        super(message);
-    }
-
-    /**
-     * Construct a new runtime exception with the detail message and cause
-     *
-     * @param  message the detail message
-     * @param  cause the cause
-     * @since  1.4
-     */
-    public RemotingException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    /**
-     * Construct a new runtime exception with throwable
-     *
-     * @param  cause the cause
-     */
-    public RemotingException(Throwable cause) {
-        super(cause);
-    }
-
-}
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTooMuchRequestException.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTooMuchRequestException.java
deleted file mode 100644
index 4a6eb0f46f..0000000000
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTooMuchRequestException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.extract.base.exception;
-
-/**
- *  too much request exception
- */
-public class RemotingTooMuchRequestException extends RemotingException {
-
-    public RemotingTooMuchRequestException(String message) {
-        super(message);
-    }
-}
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
index 1fbbd9ed6c..51742323eb 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
@@ -36,7 +36,6 @@ public class ResponseFuture {
 
     private final long opaque;
 
-    // remove the timeout
     private final long timeoutMillis;
 
     private final CountDownLatch latch = new CountDownLatch(1);
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
index 6f91132547..331cd4876a 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
@@ -34,23 +34,18 @@ public class ClientSyncExceptionMetrics {
 
     private Transporter transporter;
 
-    private String clientHost;
-
     @Builder.Default
-    private String serverHost = NetUtils.getHost();
+    private String clientHost = NetUtils.getHost();
+
+    private String serverAddress;
 
     private Throwable throwable;
 
-    public static ClientSyncExceptionMetrics of(SyncRequestDto syncRequestDto) 
{
+    public static ClientSyncExceptionMetrics of(final SyncRequestDto 
syncRequestDto, final Throwable throwable) {
         return ClientSyncExceptionMetrics.builder()
                 .transporter(syncRequestDto.getTransporter())
+                .serverAddress(syncRequestDto.getServerHost().getAddress())
+                .throwable(throwable)
                 .build();
-
     }
-
-    public ClientSyncExceptionMetrics withThrowable(Throwable throwable) {
-        this.throwable = throwable;
-        return this;
-    }
-
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
index c2dbbfefa6..8ee06e32dd 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
@@ -42,9 +42,9 @@ public class RpcMetrics {
                         .map(ClientSyncExceptionMetrics::getTransporter)
                         .map(Transporter::getHeader)
                         .map(TransporterHeader::getMethodIdentifier)
-                        .orElseGet(() -> "unknown"),
+                        .orElse("unknown"),
                 clientSyncExceptionMetrics.getClientHost(),
-                clientSyncExceptionMetrics.getServerHost());
+                clientSyncExceptionMetrics.getServerAddress());
     }
 
     public static void recordClientSyncRequestException(final Throwable 
throwable,
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/protocal/TransporterEncoder.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/protocal/TransporterEncoder.java
index 246a313b6c..ca9990d9c4 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/protocal/TransporterEncoder.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/protocal/TransporterEncoder.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.extract.base.protocal;
 
-import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
+import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler.Sharable;
@@ -28,9 +28,9 @@ import io.netty.handler.codec.MessageToByteEncoder;
 public class TransporterEncoder extends MessageToByteEncoder<Transporter> {
 
     @Override
-    protected void encode(ChannelHandlerContext ctx, Transporter transporter, 
ByteBuf out) throws Exception {
+    protected void encode(ChannelHandlerContext ctx, Transporter transporter, 
ByteBuf out) {
         if (transporter == null) {
-            throw new RemotingException("encode msg is null");
+            throw new RemoteException("encode msg is null");
         }
         out.writeByte(Transporter.MAGIC);
         out.writeByte(Transporter.VERSION);
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
index 72a3889b13..cf1e60dead 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
@@ -19,16 +19,22 @@ package org.apache.dolphinscheduler.extract.base.client;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import org.apache.dolphinscheduler.extract.base.RpcMethod;
+import org.apache.dolphinscheduler.extract.base.RpcMethodRetryStrategy;
 import org.apache.dolphinscheduler.extract.base.RpcService;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 import 
org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
+import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
 import 
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
 
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.StringUtils;
 
+import java.time.Duration;
+
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -61,6 +67,17 @@ public class ClientsTest {
         Assertions.assertNotNull(proxyClient);
     }
 
+    @Test
+    public void testPingWithRetry() {
+        IService proxyClient = Clients
+                .withService(IService.class)
+                .withHost("localhost:1");
+        Awaitility.await()
+                .atMost(Duration.ofMinutes(1))
+                .atLeast(Duration.ofSeconds(3))
+                .untilAsserted(() -> assertThrows(RemoteException.class, () -> 
proxyClient.ping("ping")));
+    }
+
     @Test
     public void testPing() {
         IService proxyClient = Clients
@@ -69,7 +86,7 @@ public class ClientsTest {
         assertEquals("pong", proxyClient.ping("ping"));
 
         MethodInvocationException methodInvocationException =
-                Assertions.assertThrows(MethodInvocationException.class, () -> 
proxyClient.ping(null));
+                assertThrows(MethodInvocationException.class, () -> 
proxyClient.ping(null));
         assertEquals("ping: null is illegal", 
methodInvocationException.getMessage());
     }
 
@@ -89,7 +106,7 @@ public class ClientsTest {
     @RpcService
     public interface IService {
 
-        @RpcMethod
+        @RpcMethod(retry = @RpcMethodRetryStrategy(maxRetryTimes = 4, 
retryInterval = 1_000))
         String ping(String ping);
 
         @RpcMethod

Reply via email to