This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 235695ccf8 [Improvement-16870][RPC] Support retry configuration in rpc
module (#16899)
235695ccf8 is described below
commit 235695ccf84334d55e46b2aa8c4fe5210f11b1fe
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Dec 19 09:24:52 2024 +0800
[Improvement-16870][RPC] Support retry configuration in rpc module (#16899)
---
.../dolphinscheduler/extract/base/RpcMethod.java | 2 +
...{RpcMethod.java => RpcMethodRetryStrategy.java} | 19 +++-
.../extract/base/SyncRequestDto.java | 4 +
.../base/client/ClientInvocationHandler.java | 9 +-
.../extract/base/client/NettyRemotingClient.java | 112 ++++++++++++---------
.../base/client/SyncClientMethodInvoker.java | 9 +-
...tException.java => RemoteTimeoutException.java} | 8 +-
.../extract/base/exception/RemotingException.java | 58 -----------
.../exception/RemotingTooMuchRequestException.java | 28 ------
.../extract/base/future/ResponseFuture.java | 1 -
.../base/metrics/ClientSyncExceptionMetrics.java | 17 ++--
.../extract/base/metrics/RpcMetrics.java | 4 +-
.../extract/base/protocal/TransporterEncoder.java | 6 +-
.../extract/base/client/ClientsTest.java | 21 +++-
14 files changed, 137 insertions(+), 161 deletions(-)
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
index 2dcf689ad8..77c9d707ad 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
@@ -30,4 +30,6 @@ public @interface RpcMethod {
long timeout() default -1;
+ RpcMethodRetryStrategy retry() default @RpcMethodRetryStrategy;
+
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethodRetryStrategy.java
similarity index 65%
copy from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
copy to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethodRetryStrategy.java
index 2dcf689ad8..5235ba55f2 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethodRetryStrategy.java
@@ -22,12 +22,27 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.net.ConnectException;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
-public @interface RpcMethod {
+public @interface RpcMethodRetryStrategy {
- long timeout() default -1;
+ /**
+ * The maximum number of retries. Default is 3, which means that the
method is retried at most 3 times, including the first call.
+ */
+ int maxRetryTimes() default 3;
+
+ /**
+ * The interval between retries, in milliseconds. If the value is less
than or equal to 0, no interval is set.
+ */
+ long retryInterval() default 0;
+
+ /**
+ * Which exception to retry.
+ * <p> Default is {@link ConnectException}.
+ */
+ Class<? extends Throwable>[] retryFor() default {ConnectException.class};
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
index 7df649cd57..b9ae76536b 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
@@ -32,7 +32,11 @@ import lombok.NoArgsConstructor;
public class SyncRequestDto {
private Host serverHost;
+
private Transporter transporter;
+
private long timeoutMillis;
+ private RpcMethodRetryStrategy retryStrategy;
+
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
index 41ec3e056d..3bf723a577 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -51,7 +52,13 @@ class ClientInvocationHandler implements InvocationHandler {
}
ClientMethodInvoker methodInvoker = methodInvokerMap.computeIfAbsent(
method.toGenericString(), m -> new
SyncClientMethodInvoker(serverHost, method, nettyRemotingClient));
- return methodInvoker.invoke(proxy, method, args);
+ try {
+ return methodInvoker.invoke(proxy, method, args);
+ } catch (UndeclaredThrowableException undeclaredThrowableException) {
+ throw undeclaredThrowableException.getCause();
+ } catch (Throwable throwable) {
+ throw throwable;
+ }
}
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
index 4aea4d6dfe..0d1f7a0f21 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
@@ -19,10 +19,11 @@ package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
+import org.apache.dolphinscheduler.extract.base.RpcMethodRetryStrategy;
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
-import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
-import
org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
+import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
+import
org.apache.dolphinscheduler.extract.base.exception.RemoteTimeoutException;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import
org.apache.dolphinscheduler.extract.base.metrics.ClientSyncDurationMetrics;
import
org.apache.dolphinscheduler.extract.base.metrics.ClientSyncExceptionMetrics;
@@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.net.InetSocketAddress;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
@@ -110,59 +112,79 @@ public class NettyRemotingClient implements AutoCloseable
{
isStarted.compareAndSet(false, true);
}
- public IRpcResponse sendSync(SyncRequestDto syncRequestDto) throws
RemotingException {
- long start = System.currentTimeMillis();
-
+ public IRpcResponse sendSync(final SyncRequestDto syncRequestDto) throws
RemoteException {
final Host host = syncRequestDto.getServerHost();
final Transporter transporter = syncRequestDto.getTransporter();
- final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ?
clientConfig.getConnectTimeoutMillis()
+ final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ?
clientConfig.getDefaultRpcTimeoutMillis()
: syncRequestDto.getTimeoutMillis();
- final long opaque = transporter.getHeader().getOpaque();
- try {
- final Channel channel = getOrCreateChannel(host);
- if (channel == null) {
- throw new RemotingException(String.format("connect to : %s
fail", host));
- }
- final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis);
- channel.writeAndFlush(transporter).addListener(future -> {
- if (future.isSuccess()) {
- responseFuture.setSendOk(true);
- return;
- } else {
- responseFuture.setSendOk(false);
+ final RpcMethodRetryStrategy retryStrategy =
syncRequestDto.getRetryStrategy();
+
+ int maxRetryTimes = retryStrategy.maxRetryTimes();
+ int currentExecuteTimes = 1;
+
+ while (true) {
+ final long start = System.currentTimeMillis();
+ try {
+ return doSendSync(transporter, host, timeoutMillis);
+ } catch (Exception ex) {
+ ClientSyncExceptionMetrics clientSyncExceptionMetrics =
+ ClientSyncExceptionMetrics.of(syncRequestDto, ex);
+
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
+
+ if (currentExecuteTimes < maxRetryTimes
+ && Arrays.stream(retryStrategy.retryFor()).anyMatch(e
-> e.isInstance(ex))) {
+ currentExecuteTimes++;
+ if (retryStrategy.retryInterval() > 0) {
+ ThreadUtils.sleep(retryStrategy.retryInterval());
+ }
+ continue;
}
- responseFuture.setCause(future.cause());
- responseFuture.putResponse(null);
- log.error("Send Sync request {} to host {} failed",
transporter, host, responseFuture.getCause());
- });
- /*
- * sync wait for result
- */
- IRpcResponse iRpcResponse = responseFuture.waitResponse();
- if (iRpcResponse == null) {
- if (responseFuture.isSendOK()) {
- throw new RemotingTimeoutException(host.toString(),
timeoutMillis, responseFuture.getCause());
+
+ if (ex instanceof RemoteException) {
+ throw (RemoteException) ex;
} else {
- throw new RemotingException(host.toString(),
responseFuture.getCause());
+ throw new RemoteException("Call method to " + host + "
failed", ex);
}
+ } finally {
+ ClientSyncDurationMetrics clientSyncDurationMetrics =
ClientSyncDurationMetrics
+ .of(syncRequestDto)
+ .withMilliseconds(System.currentTimeMillis() - start);
+
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
}
- return iRpcResponse;
- } catch (Exception ex) {
- ClientSyncExceptionMetrics clientSyncExceptionMetrics =
ClientSyncExceptionMetrics
- .of(syncRequestDto)
- .withThrowable(ex);
-
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
- if (ex instanceof RemotingException) {
- throw (RemotingException) ex;
+ }
+ }
+
+ private IRpcResponse doSendSync(final Transporter transporter,
+ final Host serverHost,
+ long timeoutMills) throws RemoteException,
InterruptedException {
+ final Channel channel = getOrCreateChannel(serverHost);
+ if (channel == null) {
+ throw new RemoteException(String.format("connect to : %s fail",
serverHost));
+ }
+ final ResponseFuture responseFuture = new
ResponseFuture(transporter.getHeader().getOpaque(), timeoutMills);
+ channel.writeAndFlush(transporter).addListener(future -> {
+ if (future.isSuccess()) {
+ responseFuture.setSendOk(true);
+ return;
} else {
- throw new RemotingException(ex);
+ responseFuture.setSendOk(false);
}
- } finally {
- ClientSyncDurationMetrics clientSyncDurationMetrics =
ClientSyncDurationMetrics
- .of(syncRequestDto)
- .withMilliseconds(System.currentTimeMillis() - start);
-
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
+ responseFuture.setCause(future.cause());
+ responseFuture.putResponse(null);
+ log.error("Send Sync request {} to host {} failed", transporter,
serverHost, responseFuture.getCause());
+ });
+ /*
+ * sync wait for result
+ */
+ final IRpcResponse iRpcResponse = responseFuture.waitResponse();
+ if (iRpcResponse != null) {
+ return iRpcResponse;
+ }
+ if (responseFuture.isSendOK()) {
+ throw new RemoteTimeoutException(serverHost.toString(),
timeoutMills, responseFuture.getCause());
+ } else {
+ throw new RemoteException(serverHost.toString(),
responseFuture.getCause());
}
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
index ccbdad945b..5a09af2c56 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
@@ -38,12 +38,13 @@ class SyncClientMethodInvoker extends
AbstractClientMethodInvoker {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
RpcMethod sync = method.getAnnotation(RpcMethod.class);
- Transporter transporter = new Transporter();
-
transporter.setBody(JsonSerializer.serialize(StandardRpcRequest.of(args)));
- transporter.setHeader(TransporterHeader.of(methodIdentifier));
+ final Transporter transporter = Transporter.of(
+ TransporterHeader.of(methodIdentifier),
+ JsonSerializer.serialize(StandardRpcRequest.of(args)));
- SyncRequestDto syncRequestDto = SyncRequestDto.builder()
+ final SyncRequestDto syncRequestDto = SyncRequestDto.builder()
.timeoutMillis(sync.timeout())
+ .retryStrategy(sync.retry())
.transporter(transporter)
.serverHost(serverHost)
.build();
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTimeoutException.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemoteTimeoutException.java
similarity index 79%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTimeoutException.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemoteTimeoutException.java
index 22b77bf1f5..abbd374250 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTimeoutException.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemoteTimeoutException.java
@@ -20,17 +20,17 @@ package org.apache.dolphinscheduler.extract.base.exception;
/**
* timeout exception
*/
-public class RemotingTimeoutException extends RemotingException {
+public class RemoteTimeoutException extends RemoteException {
- public RemotingTimeoutException(String message) {
+ public RemoteTimeoutException(String message) {
super(message);
}
- public RemotingTimeoutException(String address, long timeoutMillis) {
+ public RemoteTimeoutException(String address, long timeoutMillis) {
this(address, timeoutMillis, null);
}
- public RemotingTimeoutException(String address, long timeoutMillis,
Throwable cause) {
+ public RemoteTimeoutException(String address, long timeoutMillis,
Throwable cause) {
super(String.format("wait response on the channel %s timeout %s",
address, timeoutMillis), cause);
}
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingException.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingException.java
deleted file mode 100644
index 4290362a01..0000000000
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingException.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.extract.base.exception;
-
-/**
- * remote exception
- */
-public class RemotingException extends Exception {
-
- public RemotingException() {
- super();
- }
-
- /**
- * Construct a new runtime exception with the detail message
- *
- * @param message detail message
- */
- public RemotingException(String message) {
- super(message);
- }
-
- /**
- * Construct a new runtime exception with the detail message and cause
- *
- * @param message the detail message
- * @param cause the cause
- * @since 1.4
- */
- public RemotingException(String message, Throwable cause) {
- super(message, cause);
- }
-
- /**
- * Construct a new runtime exception with throwable
- *
- * @param cause the cause
- */
- public RemotingException(Throwable cause) {
- super(cause);
- }
-
-}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTooMuchRequestException.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTooMuchRequestException.java
deleted file mode 100644
index 4a6eb0f46f..0000000000
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/exception/RemotingTooMuchRequestException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.extract.base.exception;
-
-/**
- * too much request exception
- */
-public class RemotingTooMuchRequestException extends RemotingException {
-
- public RemotingTooMuchRequestException(String message) {
- super(message);
- }
-}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
index 1fbbd9ed6c..51742323eb 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
@@ -36,7 +36,6 @@ public class ResponseFuture {
private final long opaque;
- // remove the timeout
private final long timeoutMillis;
private final CountDownLatch latch = new CountDownLatch(1);
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
index 6f91132547..331cd4876a 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
@@ -34,23 +34,18 @@ public class ClientSyncExceptionMetrics {
private Transporter transporter;
- private String clientHost;
-
@Builder.Default
- private String serverHost = NetUtils.getHost();
+ private String clientHost = NetUtils.getHost();
+
+ private String serverAddress;
private Throwable throwable;
- public static ClientSyncExceptionMetrics of(SyncRequestDto syncRequestDto)
{
+ public static ClientSyncExceptionMetrics of(final SyncRequestDto
syncRequestDto, final Throwable throwable) {
return ClientSyncExceptionMetrics.builder()
.transporter(syncRequestDto.getTransporter())
+ .serverAddress(syncRequestDto.getServerHost().getAddress())
+ .throwable(throwable)
.build();
-
}
-
- public ClientSyncExceptionMetrics withThrowable(Throwable throwable) {
- this.throwable = throwable;
- return this;
- }
-
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
index c2dbbfefa6..8ee06e32dd 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
@@ -42,9 +42,9 @@ public class RpcMetrics {
.map(ClientSyncExceptionMetrics::getTransporter)
.map(Transporter::getHeader)
.map(TransporterHeader::getMethodIdentifier)
- .orElseGet(() -> "unknown"),
+ .orElse("unknown"),
clientSyncExceptionMetrics.getClientHost(),
- clientSyncExceptionMetrics.getServerHost());
+ clientSyncExceptionMetrics.getServerAddress());
}
public static void recordClientSyncRequestException(final Throwable
throwable,
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/protocal/TransporterEncoder.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/protocal/TransporterEncoder.java
index 246a313b6c..ca9990d9c4 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/protocal/TransporterEncoder.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/protocal/TransporterEncoder.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.extract.base.protocal;
-import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
+import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
@@ -28,9 +28,9 @@ import io.netty.handler.codec.MessageToByteEncoder;
public class TransporterEncoder extends MessageToByteEncoder<Transporter> {
@Override
- protected void encode(ChannelHandlerContext ctx, Transporter transporter,
ByteBuf out) throws Exception {
+ protected void encode(ChannelHandlerContext ctx, Transporter transporter,
ByteBuf out) {
if (transporter == null) {
- throw new RemotingException("encode msg is null");
+ throw new RemoteException("encode msg is null");
}
out.writeByte(Transporter.MAGIC);
out.writeByte(Transporter.VERSION);
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
index 72a3889b13..cf1e60dead 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
@@ -19,16 +19,22 @@ package org.apache.dolphinscheduler.extract.base.client;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
+import org.apache.dolphinscheduler.extract.base.RpcMethodRetryStrategy;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import
org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
+import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
+import java.time.Duration;
+
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -61,6 +67,17 @@ public class ClientsTest {
Assertions.assertNotNull(proxyClient);
}
+ @Test
+ public void testPingWithRetry() {
+ IService proxyClient = Clients
+ .withService(IService.class)
+ .withHost("localhost:1");
+ Awaitility.await()
+ .atMost(Duration.ofMinutes(1))
+ .atLeast(Duration.ofSeconds(3))
+ .untilAsserted(() -> assertThrows(RemoteException.class, () ->
proxyClient.ping("ping")));
+ }
+
@Test
public void testPing() {
IService proxyClient = Clients
@@ -69,7 +86,7 @@ public class ClientsTest {
assertEquals("pong", proxyClient.ping("ping"));
MethodInvocationException methodInvocationException =
- Assertions.assertThrows(MethodInvocationException.class, () ->
proxyClient.ping(null));
+ assertThrows(MethodInvocationException.class, () ->
proxyClient.ping(null));
assertEquals("ping: null is illegal",
methodInvocationException.getMessage());
}
@@ -89,7 +106,7 @@ public class ClientsTest {
@RpcService
public interface IService {
- @RpcMethod
+ @RpcMethod(retry = @RpcMethodRetryStrategy(maxRetryTimes = 4,
retryInterval = 1_000))
String ping(String ping);
@RpcMethod