This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 86ef9666c4 Fix exception occur in RpcServer side, it will not be sent
to RpcClient (#15536)
86ef9666c4 is described below
commit 86ef9666c4caca83b703b742e07e5d371f3f4835
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Jan 30 17:54:29 2024 +0800
Fix exception occur in RpcServer side, it will not be sent to RpcClient
(#15536)
---
.../extract/base/StandardRpcRequest.java | 2 +-
.../extract/base/config/NettyServerConfig.java | 6 ++++
.../base/server/ServerMethodInvokerImpl.java | 7 +++-
...ngletonJdkDynamicRpcClientProxyFactoryTest.java | 37 ++++++++++++++++------
4 files changed, 41 insertions(+), 11 deletions(-)
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/StandardRpcRequest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/StandardRpcRequest.java
index f3f0d9fb3a..d74bc08d61 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/StandardRpcRequest.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/StandardRpcRequest.java
@@ -40,7 +40,7 @@ public class StandardRpcRequest implements IRpcRequest {
final Class<?>[] argsTypes = new Class[args.length];
for (int i = 0; i < args.length; i++) {
argsBytes[i] = JsonSerializer.serialize(args[i]);
- argsTypes[i] = args[i].getClass();
+ argsTypes[i] = args[i] == null ? null : args[i].getClass();
}
return new StandardRpcRequest(argsBytes, argsTypes);
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
index d432d2cc75..9d4a2ee3d2 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
@@ -33,31 +33,37 @@ public class NettyServerConfig {
/**
* init the server connectable queue
*/
+ @Builder.Default
private int soBacklog = 1024;
/**
* whether tpc delay
*/
+ @Builder.Default
private boolean tcpNoDelay = true;
/**
* whether keep alive
*/
+ @Builder.Default
private boolean soKeepalive = true;
/**
* send buffer size
*/
+ @Builder.Default
private int sendBufferSize = 65535;
/**
* receive buffer size
*/
+ @Builder.Default
private int receiveBufferSize = 65535;
/**
* worker threads,default get machine cpus
*/
+ @Builder.Default
private int workerThread = Runtime.getRuntime().availableProcessors() * 2;
/**
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
index f81b901a5f..eea9da5e14 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.extract.base.server;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class ServerMethodInvokerImpl implements ServerMethodInvoker {
@@ -36,7 +37,11 @@ public class ServerMethodInvokerImpl implements
ServerMethodInvoker {
@Override
public Object invoke(Object... args) throws Throwable {
// todo: check the request param when register
- return method.invoke(serviceBean, args);
+ try {
+ return method.invoke(serviceBean, args);
+ } catch (InvocationTargetException ex) {
+ throw ex.getTargetException();
+ }
}
@Override
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
index 2c80773301..521cf7c75a 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
@@ -17,12 +17,19 @@
package org.apache.dolphinscheduler.extract.base.client;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+import
org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -32,12 +39,18 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
private NettyRemotingServer nettyRemotingServer;
+ private String serverAddress;
+
@BeforeEach
public void setUp() {
- nettyRemotingServer =
- new
NettyRemotingServer(NettyServerConfig.builder().serverName("ApiServer").listenPort(12345).build());
+ int listenPort = RandomUtils.nextInt(10000, 20000);
+ NettyServerConfig nettyServerConfig = NettyServerConfig.builder()
+ .serverName("ApiServer")
+ .listenPort(listenPort)
+ .build();
+ nettyRemotingServer = new NettyRemotingServer(nettyServerConfig);
nettyRemotingServer.start();
-
+ serverAddress = "localhost:" + listenPort;
new SpringServerMethodInvokerDiscovery(nettyRemotingServer)
.postProcessAfterInitialization(new IServiceImpl(),
"iServiceImpl");
}
@@ -45,23 +58,26 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
@Test
public void getProxyClient() {
IService proxyClient =
-
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345",
IService.class);
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(serverAddress,
IService.class);
Assertions.assertNotNull(proxyClient);
}
@Test
public void testPing() {
IService proxyClient =
-
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345",
IService.class);
- String ping = proxyClient.ping("ping");
- Assertions.assertEquals("pong", ping);
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(serverAddress,
IService.class);
+ assertEquals("pong", proxyClient.ping("ping"));
+
+ MethodInvocationException methodInvocationException =
+ Assertions.assertThrows(MethodInvocationException.class, () ->
proxyClient.ping(null));
+ assertEquals("ping: null is illegal",
methodInvocationException.getMessage());
}
@Test
public void testVoid() {
IService proxyClient =
-
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345",
IService.class);
- Assertions.assertDoesNotThrow(proxyClient::voidMethod);
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(serverAddress,
IService.class);
+ assertDoesNotThrow(proxyClient::voidMethod);
}
@AfterEach
@@ -83,6 +99,9 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
@Override
public String ping(String ping) {
+ if (StringUtils.isEmpty(ping)) {
+ throw new IllegalArgumentException("ping: " + ping + " is
illegal");
+ }
return "pong";
}